並行システム システム情報系情報工学域, システム情報工学研究科コンピュータサイエンス専攻 新城 靖 <yas@cs.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/cs/
http://www.cs.tsukuba.ac.jp/~yas/
科目番号
成績の付け方(新城担当の5回分)。次のものを合算する。
main() { printf("hello, world!\n"); }逐次プログラムでは、1度に1つの手続き(関数)(procedure(function))しか実行 されない。printf() が動くと main() は止まる。
複数の手続きを実行する主体は、「プロセス(process)」や「スレッド (thread)」。
図? 逐次プログラムと並行プログラムの動作(時間と空間の観点から)
並列処理と分散処理の目的
ネットワークで接続された複数のコンピュータ(multiple computers that are connected with a network)から構成される分散システム(distributed systems)と対比する時には、集中型システム(centralized systems))と呼ばれ る。
図? 集中型システムのハードウェア (hardware of a centralized system)
1950年代、60年代のハードウェアは、高い。
特に高いCPUを有効に活用したい。
図? バッチ処理における CPU と入出力装置の並列動作 (parallel processing of CPU and I/O in batch processeing)
プロセスの概念の確立。プロセスとプログラムの分離。 一般のプログラミングは、逐次のまま。
OSだけ並行プログラミング
Concurrent Pascal, Solo。
OSの実装では、実際には、「割り込み禁止(disable interrupts)」という軽 量の相互排除命令が多用された。
1台の中央の大型コンピュータ(mainframe)に、複数の「(文字)端末 (terminal)」を接続。
端末を使っている人からすると専有しているように見える。
TSS で複数のアプリケーションを同時に走らせる。 CPU が貴重な時代。CPU を遊ばせない。
図? メインフレームと端末(a mainframe and terminals)
最近は、 コンピュータの高速化で、性能が余ってきた。もともとハードウェアn台でやっ ていた仕事を、ハードウェアとしては1台に集約する。並列処理の逆。
図? 仮想計算機によるサーバの集約(集約前) (before server consolidation by virtual machines)
図? 仮想計算機によるサーバの集約(集約後) (after server consolidation by virtual machines)
1999年VMware Workstation 以降、パソコン用の VM も広く使われる。
Flynn の分類 (Flynn's taxonomy)
MIMD の分類
Machシステムでの分類 ( Mach operating system kernel, Mach microkernel )
逐次に対して、どうやって並列性(parallelism)を抽出するかが主眼。 逐次プログラムでは、マルチプロセッサで走らせても1倍。
特定用途のマルチプロセッサ(special purpose parallel computer)。
スパコン(super computers)。ベクトルコンピュータ(vector computers)。
Occam/Transputer。CSPの実装。
MMU (memory management unit)の活用。分散共有メモリ(distributed shared memory, distributed virtual memory)。
図? 共有メモリ型マルチプロセッサ(バス共有) (Shared memory multiprocessor (with a shared bus))
図? 共有メモリ型マルチプロセッサ(クロスバースイッチ) (shared memory multiprocessor with a crossbar switch)
複数のCPUが別のメモリに同時にアクセスできる。 同じメモリならバスと同様に衝突する。OSは、ジャイアント・ロック(giant lock)。
図? 非均質共有メモリ型マルチプロセッサ (shared memory multiprocessor with non-uniform memory access)
相互結合ネットワークで、遠隔のメモリを CPU から直接アドレスでアクセス できる。ただし、速度は 100 倍くらい遅い。図? LANに接続されたPC
遠隔のメモリは、アクセスできない(no remote memory access)。 機種が違う(heterogeneous)こともある。
図? シングルスレッドのプロセスとマルチスレッドのプロセス / A process with a single thread and a process with multiple threads
軽量プロセスというと、内部にループを含むような語感がある。 A lightweight process can contain a loop in it.
マルチスレッドプログラミングは、非常に難しい。
どこでスレッドを使うべきか。
Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).
Per Brinch Hansen: "The programming language Concurrent Pascal," IEEE Transactions on Software Engineering, Vol.SE-1, No.2, pp.199-207, June 1975.
type procA_t = process(args...); var local variables... procedure proc1(args...); procedure proc2(args...); begin cycle ... end; end var procA1 : procA_t ; init procA1(args...);
type monA_t = monitor(args ・・・); var loal variables procedure entry proc1(args ・・・); procedure entry proc2(args ・・・); begin initialization of local variables; end var monA1 : monA_t ; init monA1(引数);ローカル変数は、entry の手続きでしかアクセスできない。 1 つのプロセスが entry の手続きを実行中は、他のプロセスは入れない(mutual exclusion)。
図? Concurrent Pascalのプロセスとモニタ/Processes and a monitor in Concurrent Pascal
cv1 : condition; cv1.wait; 呼び出したプロセスを待たせる。block the current process. cv1.signal; 待っているプロセスがいれば全て再開させる。unblock all waiting processes.
q1 : queue; delay(q1); 呼び出したプロセスをそのキューで待たせる。 block the current process. continue(q1); そのキューで待っているプロセスがいれば1つだけ再開させる。 unblock a single process in the queue.
$ producer | consumer
2つのスレッドの間には、バッファを置く。
図? 有限バッファ(環状バッファ)、生産者、消費者 / bounded buffer (circular buffer), producer, consumer
バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。
手続き procedures
1: const BUFFER_SIZE = 4; 2: type circular_buffer = 3: monitor 4: var 5: rp : integer ; 6: wp : integer ; 7: data: array [0..BUFFER_SIZE-1] of integer; 8: used: integer; 9: not_empty : condition; 10: not_full : condition; 11: 12: procedure entry put(x:integer); 13: begin 14: while( used = BUFFER_SIZE ) do 15: non_full.wait; 16: data[wp] := x; 17: wp := wp + 1 ; 18: if( wp >= BUFFER_SIZE ) 19: wp := 0 ; 20: used := used + 1 ; 21: not_empty.signal; 22: end 23: 24: procedure entry get(result x:integer); 25: begin 26: while( used = 0 ) then 27: not_empty.wait; 28: x := data[rp]; 29: rp := rp + 1 ; 30: if( rp >= BUFFER_SIZE ) 31: rp := 0 ; 32: used := used - 1 ; 33: not_full.signal; 34: end 35: begin 36: rp := 0 ; 37: wp := 0 ; 38: used := 0 ; 39: end; 40: 41: ... 42: var buf : circular_buffer ; 43: init buf; 44: ... 45:
Pthread を利用したプログラムを書く時には、次のようなヘッダ・ファイルを 読み込む。
#include <pthread.h>
Linux, MacOSX, でのコンパイルとリンク。-lpthread
を付ける。
% cc -o create create.c -lpthread
% ./create
...
%
Solaris (Unix International系)でのコンパイルとリンク。
-D_REENTRANT
と-lpthread
を付ける。
% cc -D_REENTRANT -o create create.c -lpthread
% ./create
...
%
セマフォを使う時、Solaris 6 (SunOS 5.6) では、リンク時に -lposix4
オプションを付ける。Solaris 7-10 (SunOS 5.7, 5.8, 5.9, 5.10) では、リンク時に -lrt
オプションを付ける。
図? fork-joinモデルの実現
後で join する必要がない時には、pthread_detach() を使って切り離す。 (joinしなくてもゾンビが残らない。)
1: 2: /* 3: create-join-2.c -- スレッドを2つ作るプログラム 4: */ 5: 6: #include <stdlib.h> /* malloc() */ 7: #include <stdio.h> /* printf() */ 8: #include <pthread.h> 9: 10: struct func1_arg { 11: int x; 12: }; 13: struct func2_arg { 14: int x; 15: }; 16: 17: void func1( struct func1_arg *arg ); 18: void func2( struct func2_arg *arg ); 19: 20: main() 21: { 22: pthread_t t1 ; 23: pthread_t t2 ; 24: struct func1_arg *arg1; 25: struct func2_arg *arg2; 26: 27: printf("main()\n"); 28: arg1 = malloc( sizeof(struct func1_arg) ); 29: if( arg1 == NULL ) 30: { 31: perror("no memory for arg1"); 32: exit( 1 ); 33: } 34: arg1->x = 10; 35: pthread_create( &t1, NULL, (void *)func1, (void *)arg1 ); 36: arg2 = malloc( sizeof(struct func2_arg) ); 37: if( arg2 == NULL ) 38: { 39: perror("no memory for arg2"); 40: exit( 1 ); 41: } 42: arg2->x = 20; 43: pthread_create( &t2, NULL, (void *)func2, (void *)arg2 ); 44: pthread_join( t1, NULL ); 45: pthread_join( t2, NULL ); 46: } 47: 48: void func1( struct func1_arg *arg ) 49: { 50: int i ; 51: for( i = 0 ; i<3 ; i++ ) 52: { 53: printf("func1( %d ): %d \n",arg->x, i ); 54: } 55: } 56: 57: void func2( struct func2_arg *arg ) 58: { 59: int i ; 60: for( i = 0 ; i<3 ; i++ ) 61: { 62: printf("func2( %d ): %d \n",arg->x, i ); 63: } 64: }実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/create-join-2.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/Makefile
% make create-join-2
gcc -c -o create-join-2.o create-join-2.c
gcc create-join-2.o -lpthread -o create-join-2
% ./create-join-2
main()
func1( 10 ): 0
func2( 20 ): 0
func1( 10 ): 1
func2( 20 ): 1
func1( 10 ): 2
func2( 20 ): 2
% ./create-join-2
main()
func1( 10 ): 0
func1( 10 ): 1
func1( 10 ): 2
func2( 20 ): 0
func2( 20 ): 1
func2( 20 ): 2
% ./create-join-2
main()
func1( 10 ): 0
func1( 10 ): 1
func1( 10 ): 2
func2( 20 ): 0
func2( 20 ): 1
func2( 20 ): 2
%
この例では、次の3つのスレッドが作られる。Three threads were created.
func1
から作られたスレッド t1
func2
から作られたスレッド t2
pthread_create()
により作られる。
どういう順序で実行されるかは、決まっていない(nondeterministic)。 スレッドは、もともと順番を決めないような処理、 非同期的(asynchronous) な処理を表現するためのもの。どうしても順序を決めたければ、 mutex や条件変数といった同期機能を使う。
pthread_create()
で指定された関数からリターンすると、そ
のスレッドが終了する。pthread_exit()
を呼び出してもよい。
ただし、
初期スレッド
が終了すると、プロセス全体が終了する。
exit()
システムコールを呼び出しても終了する。
プロセスの場合、ファイル、端末、ウインドウ、主記憶、CPU時間。
主記憶やCPU時間といった資源は、横取り(preemption)しても平気なので、あま り問題になりない。
スレッドの場合は、プログラミング言語の変数(variables)。
変更されない変数(immutable variables)の値を読む(read-only)だけなら、特 に問題は起きない。それ意外の時、特に、複数のスレッドで値を読み書きする 時に問題が起きる。
1: 2: /* 3: * mutex-nolock.c -- 共有資源をロックなしでアクセスするプログラム 4: */ 5: 6: #include <stdio.h> 7: #include <pthread.h> 8: 9: void thread_A(), thread_B(); 10: int shared_resource ; 11: 12: main() { 13: pthread_t t1 ; 14: pthread_t t2 ; 15: int x; 16: shared_resource = 0 ; 17: pthread_setconcurrency( 2 ); 18: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 19: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 20: pthread_join( t1, NULL ); 21: pthread_join( t2, NULL ); 22: x = shared_resource ; 23: printf("main(): shared_resource == %d\n", x ); 24: } 25: 26: void thread_A() 27: { 28: int i, x ; 29: for( i = 0 ; i<1000000 ; i++ ) 30: { 31: x = shared_resource ; 32: x = x + 1 ; 33: shared_resource = x ; 34: } 35: } 36: 37: void thread_B() { 38: int i, x ; 39: for( i = 0 ; i<1000000 ; i++ ) { 40: x = shared_resource ; 41: x = x + 1 ; 42: shared_resource = x ; 43: } 44: }
pthread_setconcurrency() は、利用したい CPU 数をシステムに要求するもの。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/mutex-nolock.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/Makefile
% make mutex-nolock
gcc -c -o mutex-nolock.o mutex-nolock.c
gcc mutex-nolock.o -lpthread -o mutex-nolock
% ./mutex-nolock
main(): shared_resource == 1014838
% ./mutex-nolock
main(): shared_resource == 1062224
% ./mutex-nolock
main(): shared_resource == 1096882
% ./mutex-nolock
main(): shared_resource == 1091488
% ./mutex-nolock
main(): shared_resource == 1096291
%
thread_A()
, thread_B()
と2つのスレッドが作
られている。整数型の変数 shared_resource
は、その2つの
スレッドの両方からアクセスされる。2つのスレッドで合計2000000 増える予
定だが、実際に実行すると、そうはならない。実行する度に答えが違う。
単一プロセッサの Linux 等では、何度か実行しても常に 2000000 だけ増える ことがある。
このプログラムが SMP で動いているとして、動きを 考えてえる。
thread_A() thread_B() : : : : 31: x = shared_resource ; 40: x = shared_resource ; 32: x = x + 1 ; 41: x = x + 1 ; 33: shared_resource = x ; 42: shared_resource = x ; : : : :
shared_resource++
と書いてもだめ。複数の機械語命令に分割
されるので。
Pthread では、相互排除を行なうために、 mutex という仕組みがある。次のようにして、相互排除を行ないたい部分を lock と unlock で囲む。
pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER; : : pthread_mutex_lock( &murex1 ); <際どい部分。critical section.> pthread_mutex_unlock( &mutex1 );
相互排除(mutual exclusion): ある資源をアクセスできるスレッドの数を 多くても1つ(at most one)にする。
プログラムの字面上、相互排除が必要な部分を 際どい部分(critical section) ( クリティカルセクション ) という。
1: /* 2: * mutex-lock.c -- 共有資源をロックしながらアクセスするプログラム 3: */ 4: 5: #include <stdio.h> 6: #include <pthread.h> 7: 8: void thread_A(), thread_B(); 9: int shared_resource ; 10: pthread_mutex_t mutex1 ; 11: 12: main() { 13: pthread_t t1 ; 14: pthread_t t2 ; 15: int x; 16: shared_resource = 0 ; 17: pthread_mutex_init( &mutex1, NULL ); 18: pthread_setconcurrency( 2 ); 19: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 20: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 21: pthread_join( t1, NULL ); 22: pthread_join( t2, NULL ); 23: pthread_mutex_lock( &mutex1 ); 24: x = shared_resource ; 25: pthread_mutex_unlock( &mutex1 ); 26: printf("main(): shared_resource == %d\n", x ); 27: } 28: 29: void thread_A() 30: { 31: int i, x ; 32: for( i = 0 ; i<1000000 ; i++ ) 33: { 34: pthread_mutex_lock( &mutex1 ); 35: x = shared_resource ; 36: x = x + 1 ; 37: shared_resource = x ; 38: pthread_mutex_unlock( &mutex1 ); 39: } 40: } 41: 42: void thread_B() { 43: int i, x ; 44: for( i = 0 ; i<1000000 ; i++ ) { 45: pthread_mutex_lock( &mutex1 ); 46: x = shared_resource ; 47: x = x + 1 ; 48: shared_resource = x ; 49: pthread_mutex_unlock( &mutex1 ); 50: } 51: }
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/mutex-lock.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/Makefile
% make mutex-lock
gcc -c -o mutex-lock.o mutex-lock.c
gcc mutex-lock.o -lpthread -o mutex-lock
% ./mutex-lock
main(): shared_resource == 2000000
% ./mutex-lock
main(): shared_resource == 2000000
% ./mutex-lock
main(): shared_resource == 2000000
%
thread_A() thread_B() : : 34: pthread_mutex_lock( &mutex1 ); : 45: pthread_mutex_lock( &mutex1 ); 35: x = shared_resource ; <他のスレッドが実行中なので 36: x = x + 1 ; この状態でしばらく待たされる> 37: shared_resource = x ; : 38: pthread_mutex_unlock( &mutex1 ); : : <実行再開> : 46: x = shared_resource ; 47: x = x + 1 ; 48: shared_resource = x ; 49: pthread_mutex_unlock( &mutex1 );後から来た
thread_B()
は、他のスレッドが実行中は、待たさ
れる。
thread_A | thread_B
2つのスレッドの間には、
有限バッファ(bounded buffer)を置く。
バッファが空の時、thread_B() は、thread_A() が何かデータをバッファに入 れるのを待つ。バッファがいっぱいの時、thread_A() は、thread_B() がバッ ファから何かデータを取り出すのを待つ。
条件変数の操作 operations:
1: 2: /* 3: * condv-buffer.c -- 条件変数を使った有限バッファ 4: */ 5: 6: #include <stdio.h> /* printf() */ 7: #include <stdlib.h> /* malloc(), exit() */ 8: #include <pthread.h> 9: 10: #define BUFFER_SIZE 4 /* バッファの大きさ */ 11: struct circular_buffer 12: { 13: int rp ; /* 読み出す位置 */ 14: int wp ; /* 書き込む位置 */ 15: int used ; /* バッファ内の要素数 */ 16: int data[BUFFER_SIZE]; /* データを保存する場所 */ 17: pthread_mutex_t mutex ; /* この構造体の相互排除のための mutex */ 18: pthread_cond_t not_full ; /* バッファが一杯ではない状態を待つための条件変数 */ 19: pthread_cond_t not_empty ; /* バッファが空ではない状態を待つための条件変数 */ 20: }; 21: 22: void thread_A(struct circular_buffer *b); 23: void thread_B(struct circular_buffer *b); 24: 25: void put( struct circular_buffer *b,int x ) 26: { 27: pthread_mutex_lock( &b->mutex ); 28: loop: if( b->used == BUFFER_SIZE ) 29: { 30: pthread_cond_wait( &b->not_full,&b->mutex ); 31: goto loop; 32: } 33: b->data[ b->wp++ ] = x ; 34: if( b->wp >= BUFFER_SIZE ) 35: b->wp = 0 ; 36: b->used ++ ; 37: pthread_cond_broadcast( &b->not_empty ); 38: pthread_mutex_unlock( &b->mutex ); 39: } 40: 41: int get( struct circular_buffer *b ) 42: { 43: int x ; 44: pthread_mutex_lock( &b->mutex ); 45: loop: if( b->used == 0 ) 46: { 47: pthread_cond_wait( &b->not_empty,&b->mutex ); 48: goto loop; 49: } 50: x = b->data[ b->rp++ ] ; 51: if( b->rp >= BUFFER_SIZE ) 52: b->rp = 0 ; 53: b->used -- ; 54: pthread_cond_broadcast( &b->not_full ); 55: pthread_mutex_unlock( &b->mutex ); 56: return( x ); 57: } 58: 59: main() 60: { 61: pthread_t t1 ; 62: pthread_t t2 ; 63: struct circular_buffer *b ; 64: b = (struct circular_buffer *)malloc(sizeof(struct circular_buffer)); 65: if( b == NULL ) 66: { 67: perror("no memory for struct buffer\n"); 68: exit( -1 ); 69: } 70: b->rp = 0 ; 71: b->wp = 0 ; 72: b->used = 0 ; 73: pthread_mutex_init( &b->mutex, NULL ); 74: pthread_cond_init( &b->not_full,NULL ); 75: pthread_cond_init( &b->not_empty,NULL ); 76: pthread_setconcurrency( 2 ); 77: pthread_create( &t1, NULL, (void *)thread_A, (void *)b ); 78: pthread_create( &t2, NULL, (void *)thread_B, (void *)b ); 79: pthread_join( t1, NULL ); 80: pthread_join( t2, NULL ); 81: } 82: 83: void thread_A( struct circular_buffer *b ) /* producer */ 84: { 85: int i,x ; 86: for( i = 0 ; i<10 ; i++ ) 87: { 88: x = i ; 89: printf("thread_A(): put( %d )\n",x ); 90: put( b,x ); 91: } 92: } 93: 94: void thread_B( struct circular_buffer *b ) /* consumer */ 95: { 96: int i, x ; 97: for( i = 0 ; i<10 ; i++ ) 98: { 99: x = get( b ); 100: printf("thread_B(): get() %d.\n", x ); 101: } 102: }
put()
は、バッファにデータを追加する時に使う手続き。
基本的には、入口で pthread_mutex_lock()
し、
出口で pthread_mutex_unlock()
する。
バッファが一杯の時には、条件変数b->not_full
で、
一杯でないという条件になるまで待つ。
待っている間は、mutex のロックは解除される。
pthread_cond_wait()
からリターンして来る時には、もう一度
ロックされた状態に戻る。
pthread_cond_wait()
でスリープしてある間に、
変数
(rp,wp,data
)が書き換えられている可能性があるので、
もう一度最初から調べる。
get()
は、バッファからデータを取り出す時に使う手
続き。put()
とほぼ対称形。バッファが空の時に、wait
し、バッファがもはや一杯ではないことをbroadcast する。
thread_A()
は、10回バッファにデータを書き込むスレッド。
thread_B()
は逆に、10回バッファからデータを読み出すス
レッド。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/condv-buffer.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/Makefile
% make condv-buffer
gcc -c -o condv-buffer.o condv-buffer.c
gcc condv-buffer.o -lpthread -o condv-buffer
% ./condv-buffer
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_A(): put( 2 )
thread_A(): put( 3 )
thread_A(): put( 4 )
thread_B(): get() 0.
thread_B(): get() 1.
thread_B(): get() 2.
thread_B(): get() 3.
thread_A(): put( 5 )
thread_A(): put( 6 )
thread_A(): put( 7 )
thread_A(): put( 8 )
thread_B(): get() 4.
thread_B(): get() 5.
thread_B(): get() 6.
thread_B(): get() 7.
thread_A(): put( 9 )
thread_B(): get() 8.
thread_B(): get() 9.
%
複数のスレッドが同時に動いている。バッファにためられるのは、最大4なの
に、put() が 5 回連続して成功しているように見える。printf() の順番と
put(), get() の順番は違うことがある。
pthread_mutex_lock( &mutex ); while( 1 ) { if( condition_checking() ) pthread_cond_wait( &cv, &mutex ); perform(); }
バッファに要素を「1つずつ」追加しているので、
pthread_cond_signal()
でもよい。
pthread_cond_broadcast()
に変えても動くようにプログラムを
作る。
pthread_cond_wait() で待っている間に条件が変わっているかもしれないので、 最初から調べ直す。signal で1人だけしか起き上がらないと仮定してはいけ ない。
「1つずつ」ではなく、複数個同時に読み書きする時には、
pthread_cond_broadcast()
でないとだめ。
迷った時には、pthread_cond_broadcast()
。
図? 開いたモジュールと閉じたモジュール
1: /* 2: * mutex-reclock-normal.c -- 通常の mutex を使う例(デッドロック) 3: */ 4: 5: #include <stdio.h> /* printf() */ 6: #include <pthread.h> 7: 8: void thread_A(), thread_B(); 9: int shared_resource ; 10: pthread_mutex_t mutex1 ; 11: 12: deposit( int n ) 13: { 14: pthread_mutex_lock( &mutex1 ); 15: shared_resource += n ; 16: pthread_mutex_unlock( &mutex1 ); 17: } 18: 19: add_interest() 20: { 21: int i ; 22: pthread_mutex_lock( &mutex1 ); 23: i = shared_resource * 0.05 ; 24: deposit( i ); 25: pthread_mutex_unlock( &mutex1 ); 26: } 27: 28: main() { 29: pthread_t t1 ; 30: pthread_t t2 ; 31: int x; 32: shared_resource = 1000000 ; 33: pthread_mutex_init( &mutex1, NULL ); 34: 35: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 36: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 37: pthread_join( t1, NULL ); 38: pthread_join( t2, NULL ); 39: pthread_mutex_lock( &mutex1 ); 40: x = shared_resource ; 41: pthread_mutex_unlock( &mutex1 ); 42: printf("main(): shared_resource == %d\n", x ); 43: } 44: 45: void thread_A() 46: { 47: printf("thread_A(): deposit( 10000 ) ... \n"); 48: deposit( 10000 ); 49: printf("thread_A(): deposit( 10000 ) done. \n"); 50: } 51: 52: void thread_B() 53: { 54: printf("thread_B(): add_interest() ... \n"); 55: add_interest(); 56: printf("thread_B(): add_interest() done. \n"); 57: }実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/mutex-reclock-normal.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/Makefile
% make mutex-reclock-normal
gcc -c -o mutex-reclock-normal.o mutex-reclock-normal.c
gcc mutex-reclock-normal.o -lpthread -o mutex-reclock-normal
% ./mutex-reclock-normal
thread_A(): deposit( 10000 ) ...
thread_B(): add_interest() ...
thread_A(): deposit( 10000 ) done.
^C (強制終了)
%
注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、Makefile の次の行をコメントアウトしなさい。
#CFLAGS= -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NPこの結果、次のように -D が付いた形でコンパイルされる。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c
1: /* 2: * mutex-reclock-recursive.c -- 再帰的 mutex を使う例 3: */ ... 28: static int 29: my_pthread_mutex_init_recursive( pthread_mutex_t *mutex ) 30: { 31: pthread_mutexattr_t attr ; 32: int err ; 33: if( (err=pthread_mutexattr_init( &attr )) < 0 ) 34: return( 0 ); 35: if( (err=pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE)) <0 ) 36: return( 0 ); 37: err = pthread_mutex_init( mutex,&attr ); 38: return( err ); 39: } 40: 41: main() 42: { 43: pthread_t t1 ; 44: pthread_t t2 ; 45: int x; 46: shared_resource = 1000000 ; 47: my_pthread_mutex_init_recursive( &mutex1 ); 48: 49: pthread_create( &t1, NULL, (void *)thread_A, 0 ); 50: pthread_create( &t2, NULL, (void *)thread_B, 0 ); 51: pthread_join( t1, NULL ); 52: pthread_join( t2, NULL ); 53: pthread_mutex_lock( &mutex1 ); 54: x = shared_resource ; 55: pthread_mutex_unlock( &mutex1 ); 56: printf("main(): shared_resource == %d\n", x ); 57: }実行例。deposit() と add_interest() のタイミングによっては、最終結果は 違うことがある。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/mutex-reclock-recursive.c
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/Makefile
% make mutex-reclock-recursive
gcc -c -o mutex-reclock-recursive.o mutex-reclock-recursive.c
gcc mutex-reclock-recursive.o -lpthread -o mutex-reclock-recursive
% ./mutex-reclock-recursive
thread_A(): deposit( 10000 ) ...
thread_A(): deposit( 10000 ) done.
thread_B(): add_interest() ...
thread_B(): add_interest() done.
main(): shared_resource == 1060500
% ./mutex-reclock-recursive
thread_A(): deposit( 10000 ) ...
thread_A(): deposit( 10000 ) done.
thread_B(): add_interest() ...
thread_B(): add_interest() done.
main(): shared_resource == 1060500
%
注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、Makefile の次の行をコメントアウトしなさい。
#CFLAGS= -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NPこの結果、次のように -D が付いた形でコンパイルされる。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c
Java には、最初から言語のレベルでスレッドの機能が入っている。
Java | Pthread |
new Thread(); start(); | pthread_create() |
join() | pthread_join() |
synchronized | pthread_mutex_lock()とpthread_mutex_unlock()の組 |
wait() | pthread_cond_wait() |
wait(long timeout) | pthread_cond_timedwait() |
notify() | pthread_cond_signal() |
notifyAll() | pthread_cond_broadcast() |
Java の synchronized は、再帰可能(recursive)。PTHREAD_MUTEX_RECURSIVE 相当。 1つのスレッドが2度同じオブジェクトをロックしてもよい。
Pthreads のプログラムで、1つの mutex と1つの条件変数を使ったものなら、 Java で簡単に書き直せる。
Pthreadでの有限バッファのプログラムは、1つの mutex で2つの条件変数を使っているので、単純には Java で書き直せない。 生産者側と消費者側が同時に待つことはないという性質を利用する。
Java で書かれたスレッドのプログラムは、汚いものがけっこうある。スレッ ド「間」の同期で、対称系になるべき所を、片方のスレッドのメソッドにして、 非対称になっていることがある。Java でプログラムを書く時にも、active object (thread) と passive object (threadなし)をきちんと分けた方がよい。
1: 2: /* 3: * CircularBuffer.java -- Java による環状バッファ 4: */ 5: 6: class CircularBuffer 7: { 8: static final int BUFFER_SIZE = 4 ; 9: int rp ; // 読み出す位置 10: int wp ; // 書き込む位置 11: int data[]; // データを保存する場所 12: int used ; // バッファ内の要素数 13: CircularBuffer() 14: { 15: data = new int[BUFFER_SIZE]; 16: rp = 0 ; 17: wp = 0 ; 18: used = 0; 19: } 20: 21: public synchronized void put( int x ) throws InterruptedException 22: { 23: while( used == data.length ) 24: wait(); 25: data[ wp++ ] = x ; 26: if( wp == data.length ) 27: wp = 0 ; 28: used++ ; 29: notifyAll(); 30: } 31: public synchronized int get() throws InterruptedException 32: { 33: int x ; 34: while( used == 0 ) 35: wait(); 36: x = data[ rp++ ] ; 37: if( rp >= data.length ) 38: rp = 0 ; 39: used--; 40: notifyAll(); 41: return( x ); 42: } 43: }
1: 2: /* 3: * CircularBufferDemo.java -- Java による有限バッファのデモ 4: */ 5: 6: class Thread_A implements Runnable // Producer 7: { 8: CircularBuffer b; 9: Thread_A( CircularBuffer b ) 10: { 11: this.b = b; 12: } 13: public void run() 14: { 15: int i,x ; 16: for( i = 0 ; i<10 ; i++ ) 17: { 18: try 19: { 20: x = i ; 21: System.out.println("Thread_A(): put( "+x+" )"); 22: b.put( x ); 23: } 24: catch( InterruptedException e ) 25: { 26: System.err.println("Thread_A(): Interrupted"); 27: break; 28: } 29: } 30: } 31: } 32: 33: class Thread_B implements Runnable // Producer 34: { 35: CircularBuffer b; 36: Thread_B( CircularBuffer b ) 37: { 38: this.b = b; 39: } 40: public void run() 41: { 42: int i,x ; 43: for( i = 0 ; i<10 ; i++ ) 44: { 45: try 46: { 47: x = b.get(); 48: System.out.println("Thread_B(): got() "+x+"."); 49: } 50: catch( InterruptedException e ) 51: { 52: System.err.println("Thread_B(): Interrupted"); 53: break; 54: } 55: } 56: } 57: } 58: 59: class CircularBufferDemo 60: { 61: public static void main(String argv[]) 62: { 63: final CircularBuffer b = new CircularBuffer(); 64: Thread t1 = new Thread( new Thread_A(b) ); 65: t1.start(); 66: Thread t2 = new Thread( new Thread_B(b) ); 67: t2.start(); 68: try 69: { 70: t1.join(); 71: t2.join(); 72: } 73: catch( InterruptedException e ) 74: { 75: System.err.println("main(): Interrupted"); 76: } 77: } 78: }
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/CircularBuffer.java
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/CircularBufferDemo.java
% javac CircularBuffer.java CircularBufferDemo.java
% java CircularBufferDemo
Thread_A(): put( 0 )
Thread_A(): put( 1 )
Thread_A(): put( 2 )
Thread_A(): put( 3 )
Thread_A(): put( 4 )
Thread_B(): got() 0.
Thread_B(): got() 1.
Thread_B(): got() 2.
Thread_B(): got() 3.
Thread_B(): got() 4.
Thread_A(): put( 5 )
Thread_A(): put( 6 )
Thread_B(): got() 5.
Thread_A(): put( 7 )
Thread_A(): put( 8 )
Thread_A(): put( 9 )
Thread_B(): got() 6.
Thread_B(): got() 7.
Thread_B(): got() 8.
Thread_B(): got() 9.
%
Ruby には、標準でスレッドの機能が入っているが、標準ではGiant VM lock (GVL) を保持して実行されるので、並列に実行されない。ただし、入出力を行 うと GVL をリリースするので、CPU 処理と複数の入出力処理を重ね合わること はできる。Parallel 等の拡張ライブラリを使うと、CPU処理を並列に実行する こともできる。
Ruby | Pthread |
Thread.new() { } | pthread_create() |
Thread#join() | pthread_join() |
Mutex#synchronize {} | pthread_mutex_lock(), pthread_mutex_unlock() |
Mutex#lock() Mutex#unlock() | |
ConditionVariable#wait(mutex) | pthread_cond_wait() |
ConditionVariable#wait(mutex,timeout) | pthread_cond_timedwait() |
ConditionVariable#signal() | pthread_cond_signal() |
ConditionVariable#broadcast() | pthread_cond_broadcast() |
Ruby の Mutex は、再帰可能ではない。再帰可能なもの、 PTHREAD_MUTEX_RECURSIVE 相当のものが必要ならば、Mutex ではなく Monitor/Sync を使う。
Pthreads のプログラムを、Mutex/Monitor/Sync と ConditionVariable を使っ て同じロジックで Ruby のプログラムに書き直せる。
1: #!/usr/bin/env ruby 2: # -*- coding: utf-8 -*- 3: # condv-buffer-ruby.rb -- Circular Buffer in Ruby 4: 5: require 'thread' 6: 7: class CircularBuffer 8: BUFFER_SIZE = 4 9: def initialize() 10: @data = Array.new(BUFFER_SIZE) 11: @rp = 0 12: @wp = 0 13: @used = 0 14: @mutex = Mutex.new() 15: @not_full = ConditionVariable.new() 16: @not_empty = ConditionVariable.new() 17: end 18: def put( x ) 19: @mutex.synchronize { 20: while( @used == @data.length() ) 21: @not_full.wait( @mutex ) 22: end 23: @data[ @wp ] = x 24: @wp += 1 25: if( @wp >= BUFFER_SIZE ) 26: @wp = 0 27: end 28: @used += 1 29: @not_empty.broadcast() 30: } 31: end 32: def get() 33: @mutex.synchronize { 34: while( @used == 0 ) 35: @not_empty.wait( @mutex ) 36: end 37: x = @data[ @rp ] 38: @rp += 1 39: if( @rp >= BUFFER_SIZE ) 40: @rp = 0 41: end 42: @used -= 1 43: @not_full.broadcast() 44: return( x ) 45: } 46: end 47: end 48: 49: def main() 50: b = CircularBuffer.new() 51: t1 = Thread.new() { 52: thread_A( b ) 53: } 54: t2 = Thread.new() { 55: thread_B( b ) 56: } 57: t1.join() 58: t2.join() 59: end 60: 61: def thread_A( b ) # producer 62: i = 0 63: while( i < 10 ) 64: x = i 65: printf("thread_A(): put( %d )\n",x ) 66: b.put( x ); 67: i += 1 68: end 69: end 70: 71: def thread_B( b ) # consumer 72: i = 0 73: while( i < 10 ) 74: x = b.get() 75: printf("thread_B(): get() %d.\n", x ); 76: i += 1 77: end 78: end 79: 80: main()
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-04-18/ex/condv-buffer-ruby.rb
% ruby condv-buffer-ruby.rb
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_A(): put( 2 )
thread_A(): put( 3 )
thread_A(): put( 4 )
thread_B(): get() 0.
thread_B(): get() 1.
thread_B(): get() 2.
thread_B(): get() 3.
thread_A(): put( 5 )
thread_A(): put( 6 )
thread_A(): put( 7 )
thread_A(): put( 8 )
thread_B(): get() 4.
thread_A(): put( 9 )
thread_B(): get() 5.
thread_B(): get() 6.
thread_B(): get() 7.
thread_B(): get() 8.
thread_B(): get() 9.
%
締切りは、2016年4月24日 (17日)、 23:59:59 とする。
The program of bounded buffer uses two condition variables. Rewrite this program to use a single condition variables. Since a thread can block either on put() or get(), we can share a single queue of a condition variable.
Add following two functions to the program of recursive mutex . The first function returns the balance of the bank account. The second function withdraws the given amount of money from the bank account. If the bank account does not have sufficient money, this function should nothing. You may use the programming language C, Java, or Ruby.
int balance() { ... // return the balance 残高を返す } int withdraw( int n ) { ... // return 1 if succeeded. 成功の時は 1 を返す // return 0 if failed due to insufficient money. 残高不足の時は 0 }
プログラミング言語としては、C、Java、または、Ruby を使いなさい。 You may use the programming language C, Java, or Ruby.
Rewrite the bounded buffer programs (in C with Pthread Java, or Ruby ) to use semaphores.
プログラミング言語としては、C、Java、または、Ruby を使いなさい。 You may use the programming language C, Java, or Ruby.
セマフォを使った循環バッファのプログラムの作成は、次の教科書の練習問 題にもなっている(巻末に回答もある)。 The following text book of operating systems contains exercise of a bounded buffer. The answer is also included in the end of the book.
清水謙多郎: "オペレーティングシステム",岩波書店 (1992). ISBN: 4000078526.