並行システム システム情報系/情報工学域, システム情報工学研究群/情報理工学位プログラム システム情報工学研究科/コンピュータサイエンス専攻 新城 靖 <yas@cs.tsukuba.ac.jp>
このページは、次の URL にあります。
https://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25
あるいは、次のページから手繰っていくこともできます。
https://www.cs.tsukuba.ac.jp/~yas/cs/
https://www.cs.tsukuba.ac.jp/~yas/
図? シングルスレッドのプロセスとマルチスレッドのプロセス / A process with a single thread and a process with multiple threads
軽量プロセスというと、内部にループを含むような語感がある。 A lightweight process can contain a loop in it.
マルチスレッドプログラミングは、非常に難しい。
どこでスレッドを使うべきか。
Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).
Per Brinch Hansen: "The programming language Concurrent Pascal," IEEE Transactions on Software Engineering, Vol.SE-1, No.2, pp.199-207, June 1975.
type procA_t = process(args...); var local variables... procedure proc1(args...); procedure proc2(args...); begin cycle ... end; end var procA1 : procA_t ; init procA1(args...);
type monA_t = monitor(args ・・・); var loal variables procedure entry proc1(args ・・・); procedure entry proc2(args ・・・); begin initialization of local variables; end var monA1 : monA_t ; init monA1(引数);ローカル変数は、entry の手続きでしかアクセスできない。 1 つのプロセスが entry の手続きを実行中は、他のプロセスは入れない(mutual exclusion)。
図? Concurrent Pascalのプロセスとモニタ/Processes and a monitor in Concurrent Pascal
cv1 : condition; cv1.wait; 呼び出したプロセスを待たせる。block the current process. cv1.signal; 待っているプロセスがいれば全て再開させる。unblock all waiting processes.
q1 : queue; delay(q1); 呼び出したプロセスをそのキューで待たせる。 block the current process. continue(q1); そのキューで待っているプロセスがいれば1つだけ再開させる。 unblock a single process in the queue.
$ producer | consumer
2つのスレッドの間には、バッファを置く。
図? 有限バッファ(環状バッファ)、生産者、消費者 / bounded buffer (circular buffer), producer, consumer
バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。
手続き procedures
1: const BUFFER_SIZE = 4; 2: type circular_buffer = 3: monitor 4: var 5: rp : integer ; 6: wp : integer ; 7: data: array [0..BUFFER_SIZE-1] of integer; 8: used: integer; 9: not_empty : condition; 10: not_full : condition; 11: 12: procedure entry put(x:integer); 13: begin 14: while( used = BUFFER_SIZE ) do 15: not_full.wait; 16: data[wp] := x; 17: wp := wp + 1 ; 18: if( wp >= BUFFER_SIZE ) 19: wp := 0 ; 20: used := used + 1 ; 21: not_empty.signal; 22: end 23: 24: procedure entry get(result x:integer); 25: begin 26: while( used = 0 ) then 27: not_empty.wait; 28: x := data[rp]; 29: rp := rp + 1 ; 30: if( rp >= BUFFER_SIZE ) 31: rp := 0 ; 32: used := used - 1 ; 33: not_full.signal; 34: end 35: begin 36: rp := 0 ; 37: wp := 0 ; 38: used := 0 ; 39: end; 40: 41: ... 42: var buf : circular_buffer ; 43: init buf; 44: ... 45:
Pthread を利用したプログラムを書く時には、次のようなヘッダ・ファイルを 読み込む。
#include <pthread.h>
Linux, MacOSX, でのコンパイルとリンク。-lpthread
を付ける
(ことになっているが、付けなくても良いことが多い)。
% cc -o create create.c -lpthread
% ./create
...
%
Solaris (Unix International系)でのコンパイルとリンク。
-D_REENTRANT
と-lpthread
を付ける。
% cc -D_REENTRANT -o create create.c -lpthread
% ./create
...
%
セマフォを使う時、Solaris 6 (SunOS 5.6) では、リンク時に -lposix4
オプションを付ける。Solaris 7-10 (SunOS 5.7, 5.8, 5.9, 5.10) では、リンク時に -lrt
オプションを付ける。
図? fork-joinモデルの実現
後で join する必要がない時には、pthread_detach() を使って切り離す。 (joinしなくてもゾンビが残らない。)
1: 2: /* 3: create-join-2.c -- スレッドを2つ作るプログラム 4: */ 5: 6: #include <stdlib.h> /* malloc() */ 7: #include <stdio.h> /* printf() */ 8: #include <pthread.h> 9: 10: struct func1_arg { 11: int x; 12: }; 13: struct func2_arg { 14: int x; 15: }; 16: 17: void func1( struct func1_arg *arg ); 18: void func2( struct func2_arg *arg ); 19: 20: int main() 21: { 22: pthread_t t1 ; 23: pthread_t t2 ; 24: struct func1_arg *arg1; 25: struct func2_arg *arg2; 26: 27: printf("main()\n"); 28: arg1 = malloc( sizeof(struct func1_arg) ); 29: if( arg1 == NULL ) 30: { 31: perror("no memory for arg1"); 32: exit( 1 ); 33: } 34: arg1->x = 10; 35: pthread_create( &t1, NULL, (void *)func1, (void *)arg1 ); 36: arg2 = malloc( sizeof(struct func2_arg) ); 37: if( arg2 == NULL ) 38: { 39: perror("no memory for arg2"); 40: exit( 1 ); 41: } 42: arg2->x = 20; 43: pthread_create( &t2, NULL, (void *)func2, (void *)arg2 ); 44: pthread_join( t1, NULL ); 45: pthread_join( t2, NULL ); 46: return( 0 ); 47: } 48: 49: void func1( struct func1_arg *arg ) 50: { 51: int i ; 52: for( i = 0 ; i<3 ; i++ ) 53: { 54: printf("func1( %d ): %d \n",arg->x, i ); 55: } 56: } 57: 58: void func2( struct func2_arg *arg ) 59: { 60: int i ; 61: for( i = 0 ; i<3 ; i++ ) 62: { 63: printf("func2( %d ): %d \n",arg->x, i ); 64: } 65: }実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/create-join-2.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/Makefile
% make create-join-2
gcc -c -o create-join-2.o create-join-2.c
gcc create-join-2.o -lpthread -o create-join-2
% ./create-join-2
main()
func1( 10 ): 0
func2( 20 ): 0
func1( 10 ): 1
func2( 20 ): 1
func1( 10 ): 2
func2( 20 ): 2
% ./create-join-2
main()
func1( 10 ): 0
func1( 10 ): 1
func1( 10 ): 2
func2( 20 ): 0
func2( 20 ): 1
func2( 20 ): 2
% ./create-join-2
main()
func1( 10 ): 0
func1( 10 ): 1
func1( 10 ): 2
func2( 20 ): 0
func2( 20 ): 1
func2( 20 ): 2
%
この例では、次の3つのスレッドが作られる。Three threads were created.
func1
から作られたスレッド t1
func2
から作られたスレッド t2
pthread_create()
により作られる。
どういう順序で実行されるかは、決まっていない(nondeterministic)。 スレッドは、もともと順番を決めないような処理、 非同期的(asynchronous) な処理を表現するためのもの。どうしても順序を決めたければ、 mutex や条件変数といった同期機能を使う。
pthread_create()
で指定された関数からリターンすると、そ
のスレッドが終了する。pthread_exit()
を呼び出してもよい。
ただし、
初期スレッド
が終了すると、プロセス全体が終了する。
exit()
システムコールを呼び出しても終了する。
プロセスの場合、ファイル、端末、ウインドウ、主記憶、CPU時間。
主記憶やCPU時間といった資源は、横取り(preemption)しても平気なので、あま り問題になりない。
スレッドの場合は、プログラミング言語の変数(variables)。
変更されない変数(immutable variables)の値を読む(read-only)だけなら、特 に問題は起きない。それ以外の時、特に、複数のスレッドで値を読み書きする 時に問題が起きる。
1: 2: /* 3: * mutex-nolock.c -- 共有資源をロックなしでアクセスするプログラム 4: */ 5: 6: #include <stdio.h> 7: #include <pthread.h> 8: 9: void thread_A(), thread_B(); 10: int shared_resource ; 11: 12: int main() { 13: pthread_t t1 ; 14: pthread_t t2 ; 15: shared_resource = 0 ; 16: pthread_setconcurrency( 2 ); 17: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 18: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 19: pthread_join( t1, NULL ); 20: pthread_join( t2, NULL ); 21: printf("main(): shared_resource == %d\n", shared_resource ); 22: return( 0 ); 23: } 24: 25: void thread_A() 26: { 27: int i, x ; 28: for( i = 0 ; i<1000000 ; i++ ) 29: { 30: x = shared_resource ; 31: x = x + 1 ; 32: shared_resource = x ; 33: } 34: } 35: 36: void thread_B() { 37: int i, x ; 38: for( i = 0 ; i<1000000 ; i++ ) { 39: x = shared_resource ; 40: x = x + 1 ; 41: shared_resource = x ; 42: } 43: }
pthread_setconcurrency() は、利用したい CPU 数をシステムに要求するもの。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/mutex-nolock.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/Makefile
% make mutex-nolock
gcc -c -o mutex-nolock.o mutex-nolock.c
gcc mutex-nolock.o -lpthread -o mutex-nolock
% ./mutex-nolock
main(): shared_resource == 1014838
% ./mutex-nolock
main(): shared_resource == 1062224
% ./mutex-nolock
main(): shared_resource == 1096882
% ./mutex-nolock
main(): shared_resource == 1091488
% ./mutex-nolock
main(): shared_resource == 1096291
%
thread_A()
, thread_B()
と2つのスレッドが作
られている。整数型の変数 shared_resource
は、その2つの
スレッドの両方からアクセスされる。2つのスレッドで合計2000000 増える予
定だが、実際に実行すると、そうはならない。実行する度に答えが違う。
単一プロセッサの Linux 等では、何度か実行しても常に 2000000 だけ増える ことがある。
このプログラムが SMP で動いているとして、動きを 考えてえる。
thread_A() thread_B() : : : : 30: x = shared_resource ; 39: x = shared_resource ; 31: x = x + 1 ; 40: x = x + 1 ; 32: shared_resource = x ; 41: shared_resource = x ; : : : :
shared_resource++
と書いてもだめ。複数の機械語命令に分割
されるので。
相互排除(mutual exclusion): ある資源をアクセスできるスレッドの数を 多くても1つ(at most one)にする。
プログラムの字面上、相互排除が必要な部分を 際どい部分(critical section) ( クリティカルセクション ) という。
Pthread では、相互排除を行なうために、 mutex という仕組みがある。次のようにして、相互排除を行ないたい部分を lock と unlock で囲む。
pthread_mutex_t mutex1; pthread_mutex_init( &mutex1, NULL ); : : pthread_mutex_lock( &murex1 ); <際どい部分。critical section.> pthread_mutex_unlock( &mutex1 );
1: /* 2: * mutex-lock.c -- 共有資源をロックしながらアクセスするプログラム 3: */ 4: 5: #include <stdio.h> 6: #include <pthread.h> 7: 8: void thread_A(), thread_B(); 9: int shared_resource ; 10: pthread_mutex_t mutex1 ; 11: 12: int main() { 13: pthread_t t1 ; 14: pthread_t t2 ; 15: shared_resource = 0 ; 16: pthread_mutex_init( &mutex1, NULL ); 17: pthread_setconcurrency( 2 ); 18: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 19: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 20: pthread_join( t1, NULL ); 21: pthread_join( t2, NULL ); 22: printf("main(): shared_resource == %d\n", shared_resource ); 23: return( 0 ); 24: } 25: 26: void thread_A() 27: { 28: int i, x ; 29: for( i = 0 ; i<1000000 ; i++ ) 30: { 31: pthread_mutex_lock( &mutex1 ); 32: x = shared_resource ; 33: x = x + 1 ; 34: shared_resource = x ; 35: pthread_mutex_unlock( &mutex1 ); 36: } 37: } 38: 39: void thread_B() { 40: int i, x ; 41: for( i = 0 ; i<1000000 ; i++ ) { 42: pthread_mutex_lock( &mutex1 ); 43: x = shared_resource ; 44: x = x + 1 ; 45: shared_resource = x ; 46: pthread_mutex_unlock( &mutex1 ); 47: } 48: }
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/mutex-lock.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/Makefile
% make mutex-lock
gcc -c -o mutex-lock.o mutex-lock.c
gcc mutex-lock.o -lpthread -o mutex-lock
% ./mutex-lock
main(): shared_resource == 2000000
% ./mutex-lock
main(): shared_resource == 2000000
% ./mutex-lock
main(): shared_resource == 2000000
%
thread_A() thread_B() : : 31: pthread_mutex_lock( &mutex1 ); : 42: pthread_mutex_lock( &mutex1 ); 32: x = shared_resource ; <他のスレッドが実行中なので 33: x = x + 1 ; この状態でしばらく待たされる> 34: shared_resource = x ; : 35: pthread_mutex_unlock( &mutex1 ); : : <実行再開> : 43: x = shared_resource ; 44: x = x + 1 ; 45: shared_resource = x ; 46: pthread_mutex_unlock( &mutex1 );後から来た
thread_B()
は、他のスレッドが実行中は、待たさ
れる。
thread_A | thread_B
2つのスレッドの間には、
有限バッファ(bounded buffer)を置く。
バッファが空の時、thread_B() は、thread_A() が何かデータをバッファに入 れるのを待つ。バッファがいっぱいの時、thread_A() は、thread_B() がバッ ファから何かデータを取り出すのを待つ。
条件変数の操作 operations:
1: 2: /* 3: * condv-buffer.c -- 条件変数を使った環状バッファ 4: */ 5: 6: #include <stdio.h> /* printf() */ 7: #include <stdlib.h> /* malloc(), exit() */ 8: #include <pthread.h> 9: 10: #define BUFFER_SIZE 4 /* バッファの大きさ */ 11: struct circular_buffer 12: { 13: int rp ; /* 読み出す位置 */ 14: int wp ; /* 書き込む位置 */ 15: int used ; /* バッファ内の要素数 */ 16: int data[BUFFER_SIZE]; /* データを保存する場所 */ 17: pthread_mutex_t mutex ; /* この構造体の相互排除のための mutex */ 18: pthread_cond_t not_full ; /* バッファが一杯ではない状態を待つための条件変数 */ 19: pthread_cond_t not_empty ; /* バッファが空ではない状態を待つための条件変数 */ 20: }; 21: 22: void thread_A(struct circular_buffer *b); 23: void thread_B(struct circular_buffer *b); 24: 25: void put( struct circular_buffer *b,int x ) 26: { 27: pthread_mutex_lock( &b->mutex ); 28: loop: if( b->used == BUFFER_SIZE ) 29: { 30: pthread_cond_wait( &b->not_full,&b->mutex ); 31: goto loop; 32: } 33: b->data[ b->wp++ ] = x ; 34: if( b->wp >= BUFFER_SIZE ) 35: b->wp = 0 ; 36: b->used ++ ; 37: pthread_cond_broadcast( &b->not_empty ); 38: pthread_mutex_unlock( &b->mutex ); 39: } 40: 41: int get( struct circular_buffer *b ) 42: { 43: int x ; 44: pthread_mutex_lock( &b->mutex ); 45: loop: if( b->used == 0 ) 46: { 47: pthread_cond_wait( &b->not_empty,&b->mutex ); 48: goto loop; 49: } 50: x = b->data[ b->rp++ ] ; 51: if( b->rp >= BUFFER_SIZE ) 52: b->rp = 0 ; 53: b->used -- ; 54: pthread_cond_broadcast( &b->not_full ); 55: pthread_mutex_unlock( &b->mutex ); 56: return( x ); 57: } 58: 59: int main() 60: { 61: pthread_t t1 ; 62: pthread_t t2 ; 63: struct circular_buffer *b ; 64: b = (struct circular_buffer *)malloc(sizeof(struct circular_buffer)); 65: if( b == NULL ) 66: { 67: perror("no memory for struct buffer\n"); 68: exit( -1 ); 69: } 70: b->rp = 0 ; 71: b->wp = 0 ; 72: b->used = 0 ; 73: pthread_mutex_init( &b->mutex, NULL ); 74: pthread_cond_init( &b->not_full,NULL ); 75: pthread_cond_init( &b->not_empty,NULL ); 76: pthread_setconcurrency( 2 ); 77: pthread_create( &t1, NULL, (void *)thread_A, (void *)b ); 78: pthread_create( &t2, NULL, (void *)thread_B, (void *)b ); 79: pthread_join( t1, NULL ); 80: pthread_join( t2, NULL ); 81: return( 0 ); 82: } 83: 84: void thread_A( struct circular_buffer *b ) /* producer */ 85: { 86: int i,x ; 87: for( i = 0 ; i<10 ; i++ ) 88: { 89: x = i ; 90: printf("thread_A(): put( %d )\n",x ); 91: put( b,x ); 92: } 93: } 94: 95: void thread_B( struct circular_buffer *b ) /* consumer */ 96: { 97: int i, x ; 98: for( i = 0 ; i<10 ; i++ ) 99: { 100: x = get( b ); 101: printf("thread_B(): get() %d.\n", x ); 102: } 103: }
put()
は、バッファにデータを追加する時に使う手続き。
基本的には、入口で pthread_mutex_lock()
し、
出口で pthread_mutex_unlock()
する。
バッファが一杯の時には、条件変数b->not_full
で、
一杯でないという条件になるまで待つ。
待っている間は、mutex のロックは解除される。
pthread_cond_wait()
からリターンして来る時には、もう一度
ロックされた状態に戻る。
pthread_cond_wait()
でスリープしてある間に、
変数
(rp,wp,data
)が書き換えられている可能性があるので、
もう一度最初から調べる。
get()
は、バッファからデータを取り出す時に使う手
続き。put()
とほぼ対称形。バッファが空の時に、wait
し、バッファがもはや一杯ではないことをbroadcast する。
thread_A()
は、10回バッファにデータを書き込むスレッド。
thread_B()
は逆に、10回バッファからデータを読み出すス
レッド。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/condv-buffer.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/Makefile
% make condv-buffer
gcc -c -o condv-buffer.o condv-buffer.c
gcc condv-buffer.o -lpthread -o condv-buffer
% ./condv-buffer
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_B(): get() 0.
thread_A(): put( 2 )
thread_B(): get() 1.
thread_A(): put( 3 )
thread_B(): get() 2.
thread_A(): put( 4 )
thread_B(): get() 3.
thread_A(): put( 5 )
thread_B(): get() 4.
thread_A(): put( 6 )
thread_B(): get() 5.
thread_A(): put( 7 )
thread_B(): get() 6.
thread_A(): put( 8 )
thread_B(): get() 7.
thread_A(): put( 9 )
thread_B(): get() 8.
thread_B(): get() 9.
% ./condv-buffer
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_B(): get() 0.
thread_A(): put( 2 )
thread_B(): get() 1.
thread_A(): put( 3 )
thread_B(): get() 2.
thread_A(): put( 4 )
thread_B(): get() 3.
thread_A(): put( 5 )
thread_B(): get() 4.
thread_A(): put( 6 )
thread_B(): get() 5.
thread_A(): put( 7 )
thread_B(): get() 6.
thread_A(): put( 8 )
thread_B(): get() 7.
thread_A(): put( 9 )
thread_B(): get() 8.
thread_B(): get() 9.
% ./condv-buffer
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_A(): put( 2 )
thread_B(): get() 0.
thread_A(): put( 3 )
thread_B(): get() 1.
thread_A(): put( 4 )
thread_B(): get() 2.
thread_A(): put( 5 )
thread_B(): get() 3.
thread_A(): put( 6 )
thread_B(): get() 4.
thread_A(): put( 7 )
thread_B(): get() 5.
thread_A(): put( 8 )
thread_B(): get() 6.
thread_A(): put( 9 )
thread_B(): get() 7.
thread_B(): get() 8.
thread_B(): get() 9.
%
printf() の順番とput(), get() の順番は違うことがある。
1つのスレッドに着目した時には順序は同じ。
pthread_mutex_lock( &mutex ); while( 1 ) { if( condition_checking() ) pthread_cond_wait( &cv, &mutex ); perform(); }
バッファに要素を「1つずつ」追加しているので、
pthread_cond_signal()
でもよい。
pthread_cond_broadcast()
に変えても動くようにプログラムを
作る。
pthread_cond_wait() で待っている間に条件が変わっているかもしれないので、 最初から調べ直す。signal で1人だけしか起き上がらないと仮定してはいけ ない。
「1つずつ」ではなく、複数個同時に読み書きする時には、
pthread_cond_broadcast()
でないとだめ。
迷った時には、pthread_cond_broadcast()
。
図? 開いたモジュールと閉じたモジュール
1: /* 2: * mutex-reclock-normal.c -- 通常の mutex を使う例(デッドロック) 3: */ 4: 5: #include <stdio.h> /* printf() */ 6: #include <pthread.h> 7: 8: void thread_A(), thread_B(); 9: int shared_resource ; 10: pthread_mutex_t mutex1 ; 11: 12: void deposit( int n ) 13: { 14: pthread_mutex_lock( &mutex1 ); 15: shared_resource += n ; 16: pthread_mutex_unlock( &mutex1 ); 17: } 18: 19: void add_interest() 20: { 21: int i ; 22: pthread_mutex_lock( &mutex1 ); 23: i = shared_resource * 0.05 ; 24: deposit( i ); 25: pthread_mutex_unlock( &mutex1 ); 26: } 27: 28: int main() { 29: pthread_t t1 ; 30: pthread_t t2 ; 31: shared_resource = 1000000 ; 32: pthread_mutex_init( &mutex1, NULL ); 33: 34: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 35: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 36: pthread_join( t1, NULL ); 37: pthread_join( t2, NULL ); 38: printf("main(): shared_resource == %d\n", shared_resource ); 39: return( 0 ); 40: } 41: 42: void thread_A() 43: { 44: printf("thread_A(): deposit( 10000 ) ... \n"); 45: deposit( 10000 ); 46: printf("thread_A(): deposit( 10000 ) done. \n"); 47: } 48: 49: void thread_B() 50: { 51: printf("thread_B(): add_interest() ... \n"); 52: add_interest(); 53: printf("thread_B(): add_interest() done. \n"); 54: }実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/mutex-reclock-normal.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/Makefile
% make mutex-reclock-normal
gcc -c -o mutex-reclock-normal.o mutex-reclock-normal.c
gcc mutex-reclock-normal.o -lpthread -o mutex-reclock-normal
% ./mutex-reclock-normal
thread_A(): deposit( 10000 ) ...
thread_B(): add_interest() ...
thread_A(): deposit( 10000 ) done.
^C (強制終了)
%
注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、Makefile の次の行をコメントアウトしなさい。
#CFLAGS= -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NPこの結果、次のように -D が付いた形でコンパイルされる。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c
1: /* 2: * mutex-reclock-recursive.c -- 再帰的 mutex を使う例 3: */ ... 28: static int 29: my_pthread_mutex_init_recursive( pthread_mutex_t *mutex ) 30: { 31: pthread_mutexattr_t attr ; 32: int err ; 33: if( (err=pthread_mutexattr_init( &attr )) < 0 ) 34: return( 0 ); 35: if( (err=pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE)) <0 ) 36: return( 0 ); 37: err = pthread_mutex_init( mutex,&attr ); 38: return( err ); 39: } 40: 41: int main() 42: { 43: pthread_t t1 ; 44: pthread_t t2 ; 45: shared_resource = 1000000 ; 46: my_pthread_mutex_init_recursive( &mutex1 ); 47: 48: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 49: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 50: pthread_join( t1, NULL ); 51: pthread_join( t2, NULL ); 52: printf("main(): shared_resource == %d\n", shared_resource ); 53: return( 0 ); 54: }実行例。deposit() と add_interest() のタイミングによっては、最終結果は 違うことがある。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/mutex-reclock-recursive.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/Makefile
% make mutex-reclock-recursive
gcc -c -o mutex-reclock-recursive.o mutex-reclock-recursive.c
gcc mutex-reclock-recursive.o -lpthread -o mutex-reclock-recursive
% ./mutex-reclock-recursive
thread_A(): deposit( 10000 ) ...
thread_A(): deposit( 10000 ) done.
thread_B(): add_interest() ...
thread_B(): add_interest() done.
main(): shared_resource == 1060500
% ./mutex-reclock-recursive
thread_A(): deposit( 10000 ) ...
thread_A(): deposit( 10000 ) done.
thread_B(): add_interest() ...
thread_B(): add_interest() done.
main(): shared_resource == 1060500
%
注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、Makefile の次の行をコメントアウトしなさい。
#CFLAGS= -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NPこの結果、次のように -D が付いた形でコンパイルされる。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c
printf("hello,world\n");上と下は、結果が違う。
printf("hello,"); /* ここに他のスレッドの出力が混じることがある。 Other threads can print something here. */ printf("world\n");これを避けるには、
flockfile(),funlockfile(),ftrylockfile()
を使う。
flockfile(stdout); printf("hello,"); /* ここに他のスレッドの出力が混じることはない Other threads cannot print something here. */ printf("world\n"); funlockfile(stdout);
putchar()
や
getchar()
は、遅すぎる。
flockfile()/funlockfile()
の中で使うための
putchar_unlocked(),getchar_unlocked(),putc_unlocked(),getc_unlocked()
が用意されている。printf_unsafe()
はない。
<−>再帰呼出し
スレッド間でポインタを渡す時には、スレッドの寿命にも注意。
シングルスレッドのプログラムでは、static変数は、プログラムのモジュール 性(modularity) を高めるために有効に使われてきた。
マルチスレッドと相性が非常に悪い。static変数もextern変数と同様に複数の スレッドで共有される。変更する場合には、mutex でロックが必要になる。
gethostbyname()
は、
static変数に値をセットして、その番地を返す。
struct hostent *gethostbyname( char *name ){ static struct hostent ret ; ..... return( &ret ); }複数のスレッドが同時にこの関数を呼び出した場合、同じstatic変数が使われ る。
externやstaticを使わず、auto変数やmalloc()だけを使っているような手続き は、スレッド・セーフ。
別のスレッド・セーフでない手続きを呼んでいれば、それはスレッド・セーフ ではない。
Sun のマニュアルより: struct hostent *gethostbyname(const char *name); struct hostent *gethostbyname_r(const char *name, struct hostent *result, char *buffer, int buflen, int *h_errnop);
次のような関数が利用可能である。
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value)
int sem_wait(sem_t * sem)
int sem_trywait(sem_t * sem)
sem_wait()
。0でも止まらずエラーを返す。
int sem_post(sem_t * sem)
int sem_getvalue(sem_t * sem, int * sval)
int sem_destroy(sem_t * sem)
注意:名前付きのセマフォもある。sem_open() で作成/初期化し、 sem_unlink() で削除する。
注意:Solaris には、POSIX のセマフォとは別に、カーネル内でのデバイス・ ドライバ作成のためのセマフォが用意されている。
Java には、最初から言語のレベルでスレッドの機能が入っている。
Java | Pthread |
new Thread(); start(); | pthread_create() |
join() | pthread_join() |
synchronized | pthread_mutex_lock()とpthread_mutex_unlock()の組 |
wait() | pthread_cond_wait() |
wait(long timeout) | pthread_cond_timedwait() |
notify() | pthread_cond_signal() |
notifyAll() | pthread_cond_broadcast() |
Java の synchronized は、再帰可能(recursive)。PTHREAD_MUTEX_RECURSIVE 相当。 1つのスレッドが2度同じオブジェクトをロックしてもよい。
Pthreads のプログラムで、1つの mutex と1つの条件変数を使ったものなら、 Java で簡単に書き直せる。
Pthreadでの有限バッファのプログラムは、1つの mutex で2つの条件変数を使っているので、単純には Java で書き直せない。
Java で書かれたスレッドのプログラムは、汚いものがけっこうある。スレッ ド「間」の同期で、対称系になるべき所を、片方のスレッドのメソッドにして、 非対称になっていることがある。Java でプログラムを書く時にも、active object (thread) と passive object (threadなし)をきちんと分けた方がよい。
1: 2: /* 3: * CircularBuffer.java -- Java による環状バッファ 4: */ 5: 6: class CircularBuffer 7: { 8: static final int BUFFER_SIZE = 4 ; 9: int rp ; // 読み出す位置 10: int wp ; // 書き込む位置 11: int data[]; // データを保存する場所 12: int used ; // バッファ内の要素数 13: CircularBuffer() 14: { 15: data = new int[BUFFER_SIZE]; 16: rp = 0 ; 17: wp = 0 ; 18: used = 0; 19: } 20: 21: public synchronized void put( int x ) throws InterruptedException 22: { 23: while( used == BUFFER_SIZE ) 24: wait(); 25: data[ wp++ ] = x ; 26: if( wp == BUFFER_SIZE ) 27: wp = 0 ; 28: used++ ; 29: notifyAll(); 30: } 31: public synchronized int get() throws InterruptedException 32: { 33: int x ; 34: while( used == 0 ) 35: wait(); 36: x = data[ rp++ ] ; 37: if( rp >= BUFFER_SIZE ) 38: rp = 0 ; 39: used--; 40: notifyAll(); 41: return( x ); 42: } 43: }
1: 2: /* 3: * CircularBufferDemo.java -- Java による有限バッファのデモ 4: */ 5: 6: class Thread_A implements Runnable // Producer 7: { 8: CircularBuffer b; 9: Thread_A( CircularBuffer b ) 10: { 11: this.b = b; 12: } 13: public void run() 14: { 15: int i,x ; 16: for( i = 0 ; i<10 ; i++ ) 17: { 18: try 19: { 20: x = i ; 21: System.out.println("Thread_A(): put( "+x+" )"); 22: b.put( x ); 23: } 24: catch( InterruptedException e ) 25: { 26: System.err.println("Thread_A(): Interrupted"); 27: break; 28: } 29: } 30: } 31: } 32: 33: class Thread_B implements Runnable // Producer 34: { 35: CircularBuffer b; 36: Thread_B( CircularBuffer b ) 37: { 38: this.b = b; 39: } 40: public void run() 41: { 42: int i,x ; 43: for( i = 0 ; i<10 ; i++ ) 44: { 45: try 46: { 47: x = b.get(); 48: System.out.println("Thread_B(): got() "+x+"."); 49: } 50: catch( InterruptedException e ) 51: { 52: System.err.println("Thread_B(): Interrupted"); 53: break; 54: } 55: } 56: } 57: } 58: 59: class CircularBufferDemo 60: { 61: public static void main(String argv[]) 62: { 63: final CircularBuffer b = new CircularBuffer(); 64: Thread t1 = new Thread( new Thread_A(b) ); 65: t1.start(); 66: Thread t2 = new Thread( new Thread_B(b) ); 67: t2.start(); 68: try 69: { 70: t1.join(); 71: t2.join(); 72: } 73: catch( InterruptedException e ) 74: { 75: System.err.println("main(): Interrupted"); 76: } 77: } 78: }
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/CircularBuffer.java
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/CircularBufferDemo.java
% javac CircularBuffer.java CircularBufferDemo.java
% java CircularBufferDemo
Thread_A(): put( 0 )
Thread_A(): put( 1 )
Thread_A(): put( 2 )
Thread_A(): put( 3 )
Thread_A(): put( 4 )
Thread_B(): got() 0.
Thread_B(): got() 1.
Thread_B(): got() 2.
Thread_B(): got() 3.
Thread_B(): got() 4.
Thread_A(): put( 5 )
Thread_A(): put( 6 )
Thread_B(): got() 5.
Thread_A(): put( 7 )
Thread_A(): put( 8 )
Thread_A(): put( 9 )
Thread_B(): got() 6.
Thread_B(): got() 7.
Thread_B(): got() 8.
Thread_B(): got() 9.
%
Ruby には、標準でスレッドの機能が入っているが、標準ではGiant VM lock (GVL)/Global Interpreter Lock (GIL) を保持して実行されるので、並列に実行されない。ただし、入出力を行 うと GVL をリリースするので、CPU 処理と複数の入出力処理を重ね合わること はできる。Parallel 等の拡張ライブラリを使うと、CPU処理を並列に実行する こともできる。
Ruby | Pthread |
Thread.new() { } | pthread_create() |
Thread#join() | pthread_join() |
Mutex#synchronize {} | pthread_mutex_lock(), pthread_mutex_unlock() |
Mutex#lock() Mutex#unlock() | |
ConditionVariable#wait(mutex) | pthread_cond_wait() |
ConditionVariable#wait(mutex,timeout) | pthread_cond_timedwait() |
ConditionVariable#signal() | pthread_cond_signal() |
ConditionVariable#broadcast() | pthread_cond_broadcast() |
Ruby の Mutex は、再帰可能ではない。再帰可能なもの、 PTHREAD_MUTEX_RECURSIVE 相当のものが必要ならば、Mutex ではなく Monitor/Sync を使う。
Pthreads のプログラムを、Mutex/Monitor/Sync と ConditionVariable を使っ て同じロジックで Ruby のプログラムに書き直せる。
1: #!/usr/bin/env ruby 2: # -*- coding: utf-8 -*- 3: # condv-buffer-ruby.rb -- Circular Buffer in Ruby 4: 5: require 'thread' 6: 7: class CircularBuffer 8: BUFFER_SIZE = 4 9: def initialize() 10: @data = Array.new(BUFFER_SIZE) 11: @rp = 0 12: @wp = 0 13: @used = 0 14: @mutex = Mutex.new() 15: @not_full = ConditionVariable.new() 16: @not_empty = ConditionVariable.new() 17: end 18: def put( x ) 19: @mutex.synchronize { 20: while( @used == BUFFER_SIZE ) 21: @not_full.wait( @mutex ) 22: end 23: @data[ @wp ] = x 24: @wp += 1 25: if( @wp >= BUFFER_SIZE ) 26: @wp = 0 27: end 28: @used += 1 29: @not_empty.broadcast() 30: } 31: end 32: def get() 33: @mutex.synchronize { 34: while( @used == 0 ) 35: @not_empty.wait( @mutex ) 36: end 37: x = @data[ @rp ] 38: @rp += 1 39: if( @rp >= BUFFER_SIZE ) 40: @rp = 0 41: end 42: @used -= 1 43: @not_full.broadcast() 44: return( x ) 45: } 46: end 47: end 48: 49: def main() 50: b = CircularBuffer.new() 51: t1 = Thread.new() { 52: thread_A( b ) 53: } 54: t2 = Thread.new() { 55: thread_B( b ) 56: } 57: t1.join() 58: t2.join() 59: end 60: 61: def thread_A( b ) # producer 62: i = 0 63: while( i < 10 ) 64: x = i 65: printf("thread_A(): put( %d )\n",x ) 66: b.put( x ); 67: i += 1 68: end 69: end 70: 71: def thread_B( b ) # consumer 72: i = 0 73: while( i < 10 ) 74: x = b.get() 75: printf("thread_B(): got() %d.\n", x ); 76: i += 1 77: end 78: end 79: 80: main()
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/condv-buffer-ruby.rb
% ruby condv-buffer-ruby.rb
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_A(): put( 2 )
thread_A(): put( 3 )
thread_A(): put( 4 )
thread_B(): got() 0.
thread_B(): got() 1.
thread_B(): got() 2.
thread_B(): got() 3.
thread_A(): put( 5 )
thread_A(): put( 6 )
thread_A(): put( 7 )
thread_A(): put( 8 )
thread_A(): put( 9 )
thread_B(): got() 4.
thread_B(): got() 5.
thread_B(): got() 6.
thread_B(): got() 7.
thread_B(): got() 8.
thread_B(): got() 9.
%
Actor モデルについては、この授業の後半で述べる。
Python には、標準でスレッドの機能が入っているが、標準では Global Interpreter Lock (GIL) を保持して実行されるので、並列に実行されない。ただし、入出力を行 うと GIL をリリースするので、CPU 処理と複数の入出力処理を重ね合わること はできる。
Python | Pthread |
threading.Thread() | pthread_create() |
threading.Thread() join() | pthread_join() |
threading.Lock() acquire() | pthread_mutex_lock() |
threading.Lock() release() | pthread_mutex_unlock() |
with lock: | pthread_mutex_lock(), pthread_mutex_unlock() |
threading.Condition(mutex) wait() | pthread_cond_wait() |
threading.Condition(mutex) wait(timeout=to) | pthread_cond_timedwait() |
threading.Condition(mutex) notify() | pthread_cond_signal() |
threading.Condition(mutex) notify_all() | pthread_cond_broadcast() |
Python の Lock は、再帰可能ではない。再帰可能なもの、 PTHREAD_MUTEX_RECURSIVE 相当のものが必要ならば、 Lock ではなく RLock を使う。
Pthreads のプログラムを、Lock/RLock と Condition を使っ て同じロジックで Python のプログラムに書き直せる。
1: #!/usr/bin/env python3 2: # -*- coding: utf-8 -*- 3: # condv-buffer-python.py -- Circular Buffer in Python 4: 5: import threading 6: 7: class CircularBuffer(): 8: def __init__(self): 9: self.BUFFER_SIZE = 4 10: # self.data = [0 for i in range(self.BUFFER_SIZE)] 11: self.data = [None] * self.BUFFER_SIZE 12: self.rp = 0 13: self.wp = 0 14: self.used = 0 15: self.mutex = threading.Lock() 16: self.not_full = threading.Condition(self.mutex) 17: self.not_empty = threading.Condition(self.mutex) 18: 19: def put( self, x ): 20: with self.mutex: 21: while self.used == self.BUFFER_SIZE: 22: self.not_full.wait() 23: self.data[ self.wp ] = x 24: self.wp += 1 25: if self.wp >= self.BUFFER_SIZE : 26: self.wp = 0 27: self.used += 1 28: self.not_empty.notify_all() 29: 30: def get( self ): 31: with self.mutex: 32: while self.used == 0: 33: self.not_empty.wait() 34: x = self.data[ self.rp ] 35: self.rp += 1 36: if self.rp >= self.BUFFER_SIZE : 37: self.rp = 0 38: self.used -= 1 39: self.not_full.notify_all() 40: return( x ) 41: 42: def main(): 43: b = CircularBuffer() 44: t1 = threading.Thread(target=thread_A,args=(b,)) 45: t1.start() 46: t2 = threading.Thread(target=thread_B,args=(b,)) 47: t2.start() 48: t1.join() 49: t2.join() 50: 51: def thread_A(b): # producer 52: for i in range(10): 53: x = i 54: print("thread_A(): put( {0} )".format(x) ) 55: b.put( x ); 56: 57: def thread_B(b): # consumer 58: for i in range(10): 59: x = b.get() 60: print("thread_B(): got() {0}.".format(x) ); 61: 62: main()
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2025/2025-04-25/ex/condv-buffer-python.py
% python3 condv-buffer-python.py
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_B(): got() 0.
thread_B(): got() 1.
thread_A(): put( 2 )
thread_A(): put( 3 )
thread_A(): put( 4 )
thread_A(): put( 5 )
thread_A(): put( 6 )
thread_B(): got() 2.
thread_B(): got() 3.
thread_B(): got() 4.
thread_B(): got() 5.
thread_A(): put( 7 )
thread_A(): put( 8 )
thread_A(): put( 9 )
thread_B(): got() 6.
thread_B(): got() 7.
thread_B(): got() 8.
thread_B(): got() 9.
%
Today, you do not have to answer all the following exercises. See the report page for details.
The program of bounded buffer uses two condition variables. Rewrite this program to use a single condition variables. Since a thread can block either on put() or get(), we can share a single queue of a condition variable.
Add following two functions to the program of recursive mutex . The first function returns the balance of the bank account. The second function withdraws the given amount of money from the bank account. If the bank account does not have sufficient money, this function should nothing. You may use the programming language C, Java, Ruby, Python, Go, Rust or Kotlin.
int balance() { ... // return the balance 残高を返す } int withdraw( int n ) { ... // return 1 if succeeded. 成功の時は 1 を返す // return 0 if failed due to insufficient money. 残高不足の時は 0 }
作成した有限バッファに対して複数のスレッドがput(n,x[]) した時、x[] の要 素がインターリーブされることがあるか? たとえば、2つのスレッドが次のよう な操作をしたとする。 When multiple threads calls the functin put(n,x[]), can the items in each x[] be interleved or not? For example, consider the following actions.
Rewrite the bounded buffer programs (in C with Pthread Java, Ruby or Python ) to use semaphores.
プログラミング言語としては、C、Java、Ruby、Python、Go、Rust、または Kotlin を使いなさい。 You may use the programming language C, Java, Ruby, Python, Go, Rust or Kotlin.
セマフォを使った循環バッファのプログラムの考え方は、 Wikipediaのセマフォのページ にも記載されている。 3つの計数セマフォ(counting semaphore)が使われていることに注意しなさい。 The overview of the bounded buffer program with semaphores is described in the the Wikipedia page of Semaphore (programming). Note that this program uses three counting semaphores.