マルチスレッド・プログラミング(1)/ Multithread programming (1)

並行システム

                               システム情報系情報工学域,
			       システム情報工学研究科コンピュータサイエンス専攻
                               新城 靖
                               <yas@cs.tsukuba.ac.jp>

このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/cs/
http://www.cs.tsukuba.ac.jp/~yas/

■スレッド・プログラミング / multithread programming

◆スレッドとは / What is a thread

スレッド(thread) あるいは、 軽量プロセス(lightweight processes) とは、 1つの保護の単位(unit of protection)としての プロセス(タスク,あるいは,アドレス空間; a process, task, or address space) 内部にふくまれている平行処理の単位(unit of concurrent processing) (論理的な並列処理の単位(unit of logical parallel processing))。

シングルスレッドのプログラム a single thread program
1度に1つの手続き(Cの関数)しか動かない。 runs one function at the same time
マルチスレッドのプログラム a multithread program
1度にスレッドの数だけの手続きが論理的には同時に動く。 runs two or more functions at the same time (同期でブロックされているものも含む)

図? シングルスレッドのプロセスとマルチスレッドのプロセス

図? シングルスレッドのプロセスとマルチスレッドのプロセス / A process with a single thread and a process with multiple threads

軽量プロセスというと、内部にループを含むような語感がある。 A lightweight process can contain a loop in it.

◆スレッドの利用目的/ purpose of multithread programing

◆本当にスレッドが必要か/"Why Threads Are A Bad Idea (for most purposes)

John K. Ousterhout, "Why Threads Are A Bad Idea (for most purposes)", Invited Talk at the 1996 USENIX Technical Conference (January 25, 1996). [PDF]

マルチスレッドプログラミングは、非常に難しい。

シングルスレッドのイベント駆動で書けるなら、その方がいい。 GUI、分散など。

どこでスレッドを使うべきか。

◆スレッドの内容/thread contents

個々のスレッドごとに持つもの each thread has プロセス全体で共有 shared among threads in a process

◆スレッドの操作 thread operations

◆Unix のユーザ・レベルの(重たい)プロセスの操作/ operations of heavyweight processes in Unix

Unixカーネル内の並行処理/ concurrent processing in a Unix kernel

PDP-11 時代からのモデル

■Concurrent Pascal

逐次型プログラミング言語 Pascal を拡張したもの。 Extension of a sequential programming language, Pascal.

◆参考文献 references

Per Brinch Hansen: "The Architecture of Concurrent Program", Prentice Hall (1977).

Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).

Per Brinch Hansen: "The programming language Concurrent Pascal," IEEE Transactions on Software Engineering, Vol.SE-1, No.2, pp.199-207, June 1975.

◆プロセス process

プロセスは、データ型(data type)として定義。 変数がインスタンス。内部に無限ループ(infinite loop)
type procA_t = process(args...);
   var
      local variables...
   procedure proc1(args...);
   procedure proc2(args...);
begin
   cycle
       ...
   end;
end

var procA1 : procA_t ;
init procA1(args...);

◆モニタ monitor

モニタは、プロセス間で共有されるデータ構造。 a monitor is a shared data structure among processes.
type monA_t = monitor(args ・・・);
   var
      loal variables
   procedure entry proc1(args ・・・);
   procedure entry proc2(args ・・・);
begin
   initialization of local variables;
end

var monA1 : monA_t ;
init monA1(引数);
ローカル変数は、entry の手続きでしかアクセスできない。 1 つのプロセスが entry の手続きを実行中は、他のプロセスは入れない(mutual exclusion)。

図? プロセス3個、モニタ1個、entry手続き3個、局所変数

図? Concurrent Pascalのプロセスとモニタ/Processes and a monitor in Concurrent Pascal

◆条件変数とキュー condition variable and queue

  cv1 : condition;
  cv1.wait;      呼び出したプロセスを待たせる。block the current process.
  cv1.signal;   待っているプロセスがいれば全て再開させる。unblock all waiting processes.
  q1 : queue;
  delay(q1);      呼び出したプロセスをそのキューで待たせる。
                  block the current process.
  continue(q1);   そのキューで待っているプロセスがいれば1つだけ再開させる。
                  unblock a single process in the queue.
最近の言語(Pthread, Java 等)では、Concurrent Pascal の condition variable もqueue も、ともに条件変数。

◆有限バッファ bounded buffer

Unix のパイプ(pipe)のようなことを Concurrent Pascal のプロセス(スレッド)を使って実行したい。
$ producer | consumer [←]
2つのスレッドの間には、バッファを置く。

図? 環状バッファ(有限バッファ)、生産者、消費者

図? 有限バッファ(環状バッファ)、生産者、消費者 / bounded buffer (circular buffer), producer, consumer

バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。

手続き procedures

定数と変数 Constants and variables
   1: const BUFFER_SIZE = 4;
   2: type circular_buffer =
   3: monitor
   4: var
   5:     rp : integer ;
   6:     wp : integer ;
   7:     data: array [0..BUFFER_SIZE-1] of integer;
   8:     used: integer;
   9:     not_empty : condition;
  10:     not_full  : condition;
  11: 
  12:     procedure entry put(x:integer);
  13:     begin
  14:         while( used = BUFFER_SIZE ) do
  15:             non_full.wait;
  16:         data[wp] := x;
  17:         wp := wp + 1 ;
  18:         if( wp >= BUFFER_SIZE )
  19:             wp := 0 ;
  20:         used := used + 1 ;
  21:         not_empty.signal;
  22:     end
  23: 
  24:     procedure entry get(result x:integer);
  25:     begin
  26:         while( used = 0 ) then
  27:             not_empty.wait;
  28:         x := data[rp];
  29:         rp := rp + 1 ;
  30:         if( rp >= BUFFER_SIZE )
  31:             rp := 0 ;
  32:         used := used - 1 ;
  33:         not_full.signal;
  34:     end
  35: begin
  36:     rp := 0 ;
  37:     wp := 0 ;
  38:     used := 0 ;
  39: end;
  40: 
  41: ...
  42: var buf : circular_buffer ;
  43: init buf;
  44: ...
  45: 

■Pthread

Pthread は、POSIX 1003.1c-1995という標準に準拠したスレッド・ライブラリ。 POSIX Thread とも呼ばれる。

◆Pthreadを利用したプログラムのコンパイル/compiling pthread programs

Pthread を利用したプログラムを書く時には、次のようなヘッダ・ファイルを 読み込む。

#include <pthread.h>

Linux, MacOSX, でのコンパイルとリンク。-lpthread を付ける。

% cc -o create create.c -lpthread [←]
% ./create [←]
...
% []

Solaris (Unix International系)でのコンパイルとリンク。 -D_REENTRANT-lpthread を付ける。

% cc -D_REENTRANT -o create create.c -lpthread [←]
% ./create [←]
...
% []
セマフォを使う時、Solaris 6 (SunOS 5.6) では、リンク時に -lposix4 オプションを付ける。Solaris 7-10 (SunOS 5.7, 5.8, 5.9, 5.10) では、リンク時に -lrt オプションを付ける。

■Pthreadにおけるスレッドの生成・消滅/creation and termination of threads in Pthread

スレッドは、普通のプログラムの、 サブルーチン(C言語の関数、手続き; subroutine, C functions, procedures) に近い。 サブルーチンの場合,呼び出すと、呼び出された方が動き、自分自身は,止ま る。スレッドでは,新たにスレッドを生成した場合,生成した方と生成さ れた方は,論理的には2つとも同時に動く。

◆fork-joinモデル/fork-join model

図? fork-joinモデルの実現

図? fork-joinモデルの実現

  1. 逐次処理(スレッド/プロセスが1つ)の状態から始まる start with sequential processing
  2. 並列性が必要になった時、fork命令で複数のスレッド/プロセスに分か れて並列処理を行う。 fork when parallel processing is needed.
  3. 並列に動作できる部分が終ると join 命令で再び逐次処理に戻る。 join after parallel processing ends.

◆Unixのfork()/fork() in Unix

fork() システムコールでコピーが作られる。 join の代わりに、子どもは exit()、親は wait()。

◆Pthreadはcreate()/create() in Pthread

Pthread では、コピーではなく create で新たにスレッドを作る。同じ関数を 実行したい時には、直接 call する。(別の関数を実行するなら呼ばなくても よい。) 子スレッドでは、pthread_exit() (トップの 手続きからリターン)、 親は、pthread_join() する。

後で join する必要がない時には、pthread_detach() を使って切り離す。 (joinしなくてもゾンビが残らない。)

◆スレッドの生成とjoin/thread creation and join

This program creates two new threads.
   1: 
   2: /*
   3:         create-join-2.c -- スレッドを2つ作るプログラム
   4: */
   5: 
   6: #include <stdlib.h>     /* malloc() */
   7: #include <stdio.h> /* printf() */
   8: #include <pthread.h>
   9: 
  10: struct func1_arg {
  11:         int     x;
  12: };
  13: struct func2_arg {
  14:         int     x;
  15: };
  16: 
  17: void func1( struct func1_arg *arg );
  18: void func2( struct func2_arg *arg );
  19: 
  20: main()
  21: {
  22:     pthread_t t1 ;
  23:     pthread_t t2 ;
  24:     struct func1_arg *arg1;
  25:     struct func2_arg *arg2;
  26:     
  27:         printf("main()\n");
  28:         arg1 = malloc( sizeof(struct func1_arg) );
  29:         if( arg1 == NULL )
  30:         {
  31:             perror("no memory for arg1");
  32:             exit( 1 );
  33:         }
  34:         arg1->x = 10;
  35:         pthread_create( &t1, NULL, (void *)func1, (void *)arg1 );
  36:         arg2 = malloc( sizeof(struct func2_arg) );
  37:         if( arg2 == NULL )
  38:         {
  39:             perror("no memory for arg2");
  40:             exit( 1 );
  41:         }
  42:         arg2->x = 20;
  43:         pthread_create( &t2, NULL, (void *)func2, (void *)arg2 );
  44:         pthread_join( t1, NULL );
  45:         pthread_join( t2, NULL );
  46: }
  47: 
  48: void func1( struct func1_arg *arg )
  49: {
  50:     int i ;
  51:         for( i = 0 ; i<3 ; i++ )
  52:         {
  53:             printf("func1( %d ): %d \n",arg->x, i );
  54:         }
  55: }
  56: 
  57: void func2( struct func2_arg *arg  )
  58: {
  59:     int i ;
  60:         for( i = 0 ; i<3 ; i++ )
  61:         {
  62:             printf("func2( %d ): %d \n",arg->x, i );
  63:         }
  64: }
実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/create-join-2.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/Makefile [←]
% make create-join-2 [←]
gcc    -c -o create-join-2.o create-join-2.c
gcc create-join-2.o -lpthread -o create-join-2
% ./create-join-2  [←]
main()
func1( 10 ): 0 
func2( 20 ): 0 
func1( 10 ): 1 
func2( 20 ): 1 
func1( 10 ): 2 
func2( 20 ): 2 
% ./create-join-2 [←]
main()
func1( 10 ): 0 
func1( 10 ): 1 
func1( 10 ): 2 
func2( 20 ): 0 
func2( 20 ): 1 
func2( 20 ): 2 
% ./create-join-2 [←]
main()
func1( 10 ): 0 
func1( 10 ): 1 
func1( 10 ): 2 
func2( 20 ): 0 
func2( 20 ): 1 
func2( 20 ): 2 
% []
この例では、次の3つのスレッドが作られる。Three threads were created.
  1. main を実行しているスレッド
  2. func1 から作られたスレッド t1
  3. func2 から作られたスレッド t2
マルチスレッド・プログラミングでは、main関数もまた1つのスレッドが実行 していると考える。これを 初期スレッド(initial thread) 、あるいは、 メインスレッド(main thread) とよぶ。PThread では、 メインスレッド以外のスレッドは、 pthread_create() により作られる。

どういう順序で実行されるかは、決まっていない(nondeterministic)。 スレッドは、もともと順番を決めないような処理、 非同期的(asynchronous) な処理を表現するためのもの。どうしても順序を決めたければ、 mutex や条件変数といった同期機能を使う。

pthread_create()で指定された関数からリターンすると、そ のスレッドが終了する。pthread_exit() を呼び出してもよい。 ただし、 初期スレッド が終了すると、プロセス全体が終了する。 exit() システムコールを呼び出しても終了する。

■Pthread mutexによるスレッド間の同期/inter-thread synchronization with mutex in Pthread

◆共有資源/shared resource

複数のプロセス/スレッドで共有しなければならないもの。

プロセスの場合、ファイル、端末、ウインドウ、主記憶、CPU時間。

主記憶やCPU時間といった資源は、横取り(preemption)しても平気なので、あま り問題になりない。

スレッドの場合は、プログラミング言語の変数(variables)。

変更されない変数(immutable variables)の値を読む(read-only)だけなら、特 に問題は起きない。それ以外の時、特に、複数のスレッドで値を読み書きする 時に問題が起きる。

◆複数のスレッドによる共有資源のアクセス(ロックなし、バグ入り)/accessing a shared resource by multiple threads without locking (with a bug)

In this program, two threads access a shared resource without locking.
   1: 
   2: /*
   3:  * mutex-nolock.c -- 共有資源をロックなしでアクセスするプログラム
   4:  */
   5: 
   6: #include <stdio.h>
   7: #include <pthread.h>
   8: 
   9: void thread_A(), thread_B();
  10: int     shared_resource ;
  11: 
  12: main() {
  13:     pthread_t t1 ;
  14:     pthread_t t2 ;
  15:         shared_resource = 0 ;
  16:         pthread_setconcurrency( 2 );
  17:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  18:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  19:         pthread_join( t1, NULL );
  20:         pthread_join( t2, NULL );
  21:         printf("main(): shared_resource == %d\n", shared_resource );
  22: }
  23: 
  24: void thread_A()
  25: {
  26:     int i, x ;
  27:         for( i = 0 ; i<1000000 ; i++ )
  28:         {
  29:             x = shared_resource ;
  30:             x = x + 1 ;
  31:             shared_resource = x ;
  32:         }
  33: }
  34: 
  35: void thread_B() {
  36:     int i, x ;
  37:         for( i = 0 ; i<1000000 ; i++ ) {
  38:             x = shared_resource ;
  39:             x = x + 1 ;
  40:             shared_resource = x ;
  41:         }
  42: }

pthread_setconcurrency() は、利用したい CPU 数をシステムに要求するもの。

◆実行結果(ロックなし)/execution results without locking

SMP の Solaris, MacOSX での実行結果。 Execution results on an SMP with Solaris or Mac OS X.
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/mutex-nolock.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/Makefile [←]
% make mutex-nolock [←]
gcc    -c -o mutex-nolock.o mutex-nolock.c
gcc mutex-nolock.o -lpthread -o mutex-nolock
% ./mutex-nolock  [←]
main(): shared_resource == 1014838
% ./mutex-nolock [←]
main(): shared_resource == 1062224
% ./mutex-nolock [←]
main(): shared_resource == 1096882
% ./mutex-nolock [←]
main(): shared_resource == 1091488
% ./mutex-nolock [←]
main(): shared_resource == 1096291
% []
thread_A(), thread_B()と2つのスレッドが作 られている。整数型の変数 shared_resource は、その2つの スレッドの両方からアクセスされる。2つのスレッドで合計2000000 増える予 定だが、実際に実行すると、そうはならない。実行する度に答えが違う。

単一プロセッサの Linux 等では、何度か実行しても常に 2000000 だけ増える ことがある。

◆考察(ロックなし)/consideration

このプログラムが SMP で動いているとして、動きを 考えてえる。

thread_A()			thread_B()
	:				:
	:				:
29: x = shared_resource ;
				38: x = shared_resource ;
30: x = x + 1 ;
				39: x = x + 1 ;
31: shared_resource = x ;
				40: shared_resource = x ;
	:				:
	:				:
shared_resource++と書いてもだめ。複数の機械語命令に分割 されるので。

◆相互排除(mutual exclusion)

相互排除(mutual exclusion): ある資源をアクセスできるスレッドの数を 多くても1つ(at most one)にする。

プログラムの字面上、相互排除が必要な部分を 際どい部分(critical section) ( クリティカルセクション ) という。

◆Pthreadでの相互排除/mutual exclusion in Pthread

Pthread では、相互排除を行なうために、 mutex という仕組みがある。次のようにして、相互排除を行ないたい部分を lock と unlock で囲む。

    pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
	:
	:
    pthread_mutex_lock( &murex1 );
        <際どい部分。critical section.>
    pthread_mutex_unlock( &mutex1 );

◆複数のスレッドによる共有資源のアクセス(ロック付き)/accessing a shared resource by multiple threads with locking

ロックなしのプログラムを、mutex を使っ て書き直したもの。
   1: /*
   2:  * mutex-lock.c -- 共有資源をロックしながらアクセスするプログラム
   3:  */
   4: 
   5: #include <stdio.h>
   6: #include <pthread.h>
   7: 
   8: void thread_A(), thread_B();
   9: int     shared_resource ;
  10: pthread_mutex_t mutex1 ;
  11: 
  12: main() {
  13:     pthread_t t1 ;
  14:     pthread_t t2 ;
  15:         shared_resource = 0 ;
  16:         pthread_mutex_init( &mutex1, NULL );
  17:         pthread_setconcurrency( 2 );
  18:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  19:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  20:         pthread_join( t1, NULL );
  21:         pthread_join( t2, NULL );
  22:         printf("main(): shared_resource == %d\n", shared_resource );
  23: }
  24: 
  25: void thread_A()
  26: {
  27:     int i, x ;
  28:         for( i = 0 ; i<1000000 ; i++ )
  29:         {
  30:             pthread_mutex_lock( &mutex1 );
  31:             x = shared_resource ;
  32:             x = x + 1 ;
  33:             shared_resource = x ;
  34:             pthread_mutex_unlock( &mutex1 );
  35:         }
  36: }
  37: 
  38: void thread_B() {
  39:     int i, x ;
  40:         for( i = 0 ; i<1000000 ; i++ ) {
  41:             pthread_mutex_lock( &mutex1 );
  42:             x = shared_resource ;
  43:             x = x + 1 ;
  44:             shared_resource = x ;
  45:             pthread_mutex_unlock( &mutex1 );
  46:         }
  47: }

◆実行結果(ロック付き)execution results with locking

% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/mutex-lock.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/Makefile [←]
% make mutex-lock [←]
gcc    -c -o mutex-lock.o mutex-lock.c
gcc mutex-lock.o -lpthread -o mutex-lock
% ./mutex-lock [←]
main(): shared_resource == 2000000
% ./mutex-lock [←]
main(): shared_resource == 2000000
% ./mutex-lock [←]
main(): shared_resource == 2000000
% []

◆考察(ロック付き)/consideration

このプログラムが SMP で動いているとして、動きを 考えてえる。
thread_A()				thread_B()

	:					:
30: pthread_mutex_lock( &mutex1 );		:
					41: pthread_mutex_lock( &mutex1 );
31: x = shared_resource ;		<他のスレッドが実行中なので
32: x = x + 1 ;				 この状態でしばらく待たされる>
33: shared_resource = x ;			:
34: pthread_mutex_unlock( &mutex1 );		:
	:				<実行再開>
	:				42: x = shared_resource ;
					43: x = x + 1 ;
					44: shared_resource = x ;
					45: pthread_mutex_unlock( &mutex1 );
後から来た thread_B() は、他のスレッドが実行中は、待たさ れる。

■Pthread 条件変数によるスレッド間の同期/Inter-thread synchronization in Pthread

スレッドでプログラムを作っていると、あるスレッドが別のスレッドの仕事の 完了を待つ必要が出がある。

◆Pthteadでの有限バッファ/bounded buffer in Pthread

Unix のパイプのようなことをスレッドを使って実行したい。
thread_A | thread_B
2つのスレッドの間には、 有限バッファ(bounded buffer)を置く。

バッファが空の時、thread_B() は、thread_A() が何かデータをバッファに入 れるのを待つ。バッファがいっぱいの時、thread_A() は、thread_B() がバッ ファから何かデータを取り出すのを待つ。

◆条件変数(condition variable)

条件変数(condition variable) で、ある条件が生じたことを待つ。

条件変数の操作 operations:

wait
ある条件が満たされるまで待つ wait for a condition
broadcast
ある条件が満たされたことを伝える。待っているスレッドが全て起き上がる。 Raise a condition. Unblocks all threads currently blocked on the specified condition variable.
signal
ある条件が満たされたことを伝える。待っているスレッドが1つだけ起き上がる。 Raise a condition. Unblocks at least one thread currently blocked on the specified condition variable.

◆条件変数を使った有限バッファ(bounded buffer with condition variables)

Concurrent Pascalのプログラムと同じ 手続き名と変数名(same procedure and variable names)
   1: 
   2: /*
   3:  * condv-buffer.c -- 条件変数を使った有限バッファ
   4:  */
   5: 
   6: #include <stdio.h>      /* printf() */
   7: #include <stdlib.h>     /* malloc(), exit() */
   8: #include <pthread.h>
   9: 
  10: #define BUFFER_SIZE     4               /* バッファの大きさ */
  11: struct circular_buffer
  12: {
  13:         int rp ;                        /* 読み出す位置 */
  14:         int wp ;                        /* 書き込む位置 */
  15:         int used ;                      /* バッファ内の要素数 */
  16:         int data[BUFFER_SIZE];          /* データを保存する場所 */
  17:         pthread_mutex_t mutex ;         /* この構造体の相互排除のための mutex */
  18:         pthread_cond_t  not_full ;      /* バッファが一杯ではない状態を待つための条件変数 */
  19:         pthread_cond_t  not_empty ;     /* バッファが空ではない状態を待つための条件変数 */
  20: };
  21: 
  22: void thread_A(struct circular_buffer *b);
  23: void thread_B(struct circular_buffer *b);
  24: 
  25: void put( struct circular_buffer *b,int x )
  26: {
  27:         pthread_mutex_lock( &b->mutex );
  28: loop:   if( b->used == BUFFER_SIZE )
  29:         {
  30:             pthread_cond_wait( &b->not_full,&b->mutex );
  31:             goto loop;
  32:         }
  33:         b->data[ b->wp++ ] = x ;
  34:         if( b->wp >= BUFFER_SIZE )
  35:             b->wp = 0 ;
  36:         b->used ++ ;
  37:         pthread_cond_broadcast( &b->not_empty );
  38:         pthread_mutex_unlock( &b->mutex );
  39: }
  40: 
  41: int get( struct circular_buffer *b )
  42: {
  43:     int x ;
  44:         pthread_mutex_lock( &b->mutex );
  45: loop:   if( b->used == 0 )
  46:         {
  47:             pthread_cond_wait( &b->not_empty,&b->mutex );
  48:             goto loop;
  49:         }
  50:         x = b->data[ b->rp++ ] ;
  51:         if( b->rp >= BUFFER_SIZE )
  52:             b->rp = 0 ;
  53:         b->used -- ;
  54:         pthread_cond_broadcast( &b->not_full );
  55:         pthread_mutex_unlock( &b->mutex );
  56:         return( x );
  57: }
  58: 
  59: main()
  60: {
  61:     pthread_t t1 ;
  62:     pthread_t t2 ;
  63:     struct circular_buffer *b  ;
  64:         b = (struct circular_buffer *)malloc(sizeof(struct circular_buffer));
  65:         if( b == NULL )
  66:         {
  67:             perror("no memory for struct buffer\n");
  68:             exit( -1 );
  69:         }
  70:         b->rp = 0 ;
  71:         b->wp = 0 ;
  72:         b->used = 0 ;
  73:         pthread_mutex_init( &b->mutex, NULL );
  74:         pthread_cond_init( &b->not_full,NULL );
  75:         pthread_cond_init( &b->not_empty,NULL );
  76:         pthread_setconcurrency( 2 );
  77:         pthread_create( &t1, NULL, (void *)thread_A, (void *)b );
  78:         pthread_create( &t2, NULL, (void *)thread_B, (void *)b );
  79:         pthread_join( t1, NULL );
  80:         pthread_join( t2, NULL );
  81: }
  82: 
  83: void thread_A( struct circular_buffer *b )      /* producer */
  84: {
  85:     int i,x ;
  86:         for( i = 0 ; i<10 ; i++ )
  87:         {
  88:             x = i ;
  89:             printf("thread_A(): put( %d )\n",x );
  90:             put( b,x );
  91:         }
  92: }
  93: 
  94: void thread_B( struct circular_buffer *b )      /* consumer */
  95: {
  96:     int i, x ;
  97:         for( i = 0 ; i<10 ; i++ )
  98:         {
  99:             x = get( b );
 100:             printf("thread_B(): get() %d.\n", x );
 101:         }
 102: }

put() は、バッファにデータを追加する時に使う手続き。

基本的には、入口で pthread_mutex_lock() し、 出口で pthread_mutex_unlock() する。

バッファが一杯の時には、条件変数b->not_full で、 一杯でないという条件になるまで待つ。

待っている間は、mutex のロックは解除される。

pthread_cond_wait() からリターンして来る時には、もう一度 ロックされた状態に戻る。 pthread_cond_wait() でスリープしてある間に、 変数 (rp,wp,data)が書き換えられている可能性があるので、 もう一度最初から調べる。

get() は、バッファからデータを取り出す時に使う手 続き。put()とほぼ対称形。バッファが空の時に、wait し、バッファがもはや一杯ではないことをbroadcast する。

thread_A() は、10回バッファにデータを書き込むスレッド。 thread_B() は逆に、10回バッファからデータを読み出すス レッド。

◆実行結果 execution results

% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/condv-buffer.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/Makefile [←]
% make condv-buffer [←]
gcc    -c -o condv-buffer.o condv-buffer.c
gcc condv-buffer.o -lpthread -o condv-buffer
% ./condv-buffer  [←]
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_A(): put( 2 )
thread_A(): put( 3 )
thread_A(): put( 4 )
thread_B(): get() 0.
thread_B(): get() 1.
thread_B(): get() 2.
thread_B(): get() 3.
thread_A(): put( 5 )
thread_A(): put( 6 )
thread_A(): put( 7 )
thread_A(): put( 8 )
thread_B(): get() 4.
thread_B(): get() 5.
thread_B(): get() 6.
thread_B(): get() 7.
thread_A(): put( 9 )
thread_B(): get() 8.
thread_B(): get() 9.
% []
複数のスレッドが同時に動いている。バッファにためられるのは、最大4なの に、put() が 5 回連続して成功しているように見える。printf() の順番と put(), get() の順番は違うことがある。

◆Concurrent Pascal と Pthread の比較 / comparison

共通点 common ideas 相違点 differences

◆種村流/Tanemura Style

    pthread_mutex_lock( &mutex );
    while( 1 )
    {
        if( condition_checking() )
            pthread_cond_wait( &cv, &mutex );
	perform();
    }

◆signal or broadcastか

バッファに要素を「1つずつ」追加しているので、 pthread_cond_signal() でもよい。 pthread_cond_broadcast() に変えても動くようにプログラムを 作る。

pthread_cond_wait() で待っている間に条件が変わっているかもしれないので、 最初から調べ直す。signal で1人だけしか起き上がらないと仮定してはいけ ない。

「1つずつ」ではなく、複数個同時に読み書きする時には、 pthread_cond_broadcast() でないとだめ。

迷った時には、pthread_cond_broadcast()

◆ダブルバッファリング(double buffering)

整数を1つバッファに書き込むだけでロック/アンロックを行なっていると、 実際の並列処理では重たい。ロックの回数を減らすために、ダブルバッファリ ングと呼ばれる技術がよく使われる。読み手と書き手で別々にバッファをもう け、1つのバッファの処理をしている間は、ロックを行なわない。

■再帰的 mutex (recursive mutex)

1つのスレッドで1つの mutex を複数回ロックしたい。

開いたモジュールでは一度外にだたスレッドがもう一度入ってくる。

図? 開いたモジュールと閉じたモジュール

◆標準mutexでのデッドロック/deadlock with a normal mutex

export している関数の入口で lock, 出口 unlockを入れて、 スレッド・セーフなモジュールを作りたい。 export している関数が、他の export している関数を呼び出すと、デッドロッ クになる。
   1: /*
   2:  * mutex-reclock-normal.c -- 通常の mutex を使う例(デッドロック)
   3:  */
   4: 
   5: #include <stdio.h>      /* printf() */
   6: #include <pthread.h>
   7: 
   8: void thread_A(), thread_B();
   9: int     shared_resource ;
  10: pthread_mutex_t mutex1 ;
  11: 
  12: deposit( int n )
  13: {
  14:         pthread_mutex_lock( &mutex1 );
  15:         shared_resource += n ;
  16:         pthread_mutex_unlock( &mutex1 );
  17: }
  18: 
  19: add_interest()
  20: {
  21:     int i ;
  22:         pthread_mutex_lock( &mutex1 );
  23:         i = shared_resource * 0.05 ;
  24:         deposit( i );
  25:         pthread_mutex_unlock( &mutex1 );
  26: }
  27: 
  28: main() {
  29:     pthread_t t1 ;
  30:     pthread_t t2 ;
  31:         shared_resource = 1000000 ;
  32:         pthread_mutex_init( &mutex1, NULL );
  33: 
  34:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  35:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  36:         pthread_join( t1, NULL );
  37:         pthread_join( t2, NULL );
  38:         printf("main(): shared_resource == %d\n", shared_resource );
  39: }
  40: 
  41: void thread_A()
  42: {
  43:         printf("thread_A(): deposit( 10000 ) ... \n");
  44:         deposit( 10000 );       
  45:         printf("thread_A(): deposit( 10000 ) done. \n");
  46: }
  47: 
  48: void thread_B()
  49: {
  50:         printf("thread_B(): add_interest() ... \n");
  51:         add_interest();
  52:         printf("thread_B(): add_interest() done. \n");
  53: }
実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/mutex-reclock-normal.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/Makefile [←]
% make mutex-reclock-normal [←]
gcc    -c -o mutex-reclock-normal.o mutex-reclock-normal.c
gcc mutex-reclock-normal.o -lpthread -o mutex-reclock-normal
% ./mutex-reclock-normal  [←]
thread_A(): deposit( 10000 ) ... 
thread_B(): add_interest() ... 
thread_A(): deposit( 10000 ) done. 
^C (強制終了)
% []
% []

注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、Makefile の次の行をコメントアウトしなさい。

#CFLAGS=	-DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP
この結果、次のように -D が付いた形でコンパイルされる。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c [←]

◆再帰的mutex/with recursive mutex

   1: /*
   2:  * mutex-reclock-recursive.c -- 再帰的 mutex を使う例
   3:  */
...
  28: static int
  29: my_pthread_mutex_init_recursive( pthread_mutex_t *mutex )
  30: {
  31:     pthread_mutexattr_t attr ;
  32:     int err ;
  33:         if( (err=pthread_mutexattr_init( &attr )) < 0 )
  34:             return( 0 );
  35:         if( (err=pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE)) <0 )
  36:             return( 0 );
  37:         err = pthread_mutex_init( mutex,&attr );
  38:         return( err );
  39: }
  40: 
  41: main()
  42: {
  43:     pthread_t t1 ;
  44:     pthread_t t2 ;
  45:         shared_resource = 1000000 ;
  46:         my_pthread_mutex_init_recursive( &mutex1 );
  47: 
  48:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  49:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  50:         pthread_join( t1, NULL );
  51:         pthread_join( t2, NULL );
  52:         printf("main(): shared_resource == %d\n", shared_resource );
  53: }
実行例。deposit() と add_interest() のタイミングによっては、最終結果は 違うことがある。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/mutex-reclock-recursive.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/Makefile [←]
% make mutex-reclock-recursive [←]
gcc    -c -o mutex-reclock-recursive.o mutex-reclock-recursive.c
gcc mutex-reclock-recursive.o -lpthread -o mutex-reclock-recursive
% ./mutex-reclock-recursive  [←]
thread_A(): deposit( 10000 ) ... 
thread_A(): deposit( 10000 ) done. 
thread_B(): add_interest() ... 
thread_B(): add_interest() done. 
main(): shared_resource == 1060500
% ./mutex-reclock-recursive [←]
thread_A(): deposit( 10000 ) ... 
thread_A(): deposit( 10000 ) done. 
thread_B(): add_interest() ... 
thread_B(): add_interest() done. 
main(): shared_resource == 1060500
% []

注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、Makefile の次の行をコメントアウトしなさい。

#CFLAGS=	-DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP
この結果、次のように -D が付いた形でコンパイルされる。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c [←]

◆stdio専用再帰的lock/ recursive locking for stdio

printf(), fputs() などの標準入出力(stdio)ライブラリ自体は、スレッド・ セーフなので、単体で使う分にはロックは不要である。 一連の入出力をまとめるために、ロックしなければならないことがある。
	printf("hello,world\n");
上と下は、結果が違う。
	printf("hello,");
	/* ここに他のスレッドの出力が混じることがある。
	   Other threads can print something here.
	*/
	printf("world\n");
これを避けるには、 flockfile(),funlockfile(),ftrylockfile()を使う。
	flockfile(stdout);
	printf("hello,");
	/* ここに他のスレッドの出力が混じることはない
	   Other threads cannot print something here.
	 */
	printf("world\n");
	funlockfile(stdout);
putchar()getchar()は、遅すぎる。 flockfile()/funlockfile()の中で使うための putchar_unlocked(),getchar_unlocked(),putc_unlocked(),getc_unlocked() が用意されている。printf_unsafe() はない。

■Pthreadとメモリ/Pthread and memory

◆auto変数/auto variables

各スレッドには、独立したスタックが割り当てられる。C言語の auto 変数 (auto variables, local variables)は、スレッドごとにコピーが作られる。

<−>再帰呼出し

スレッド間でポインタを渡す時には、スレッドの寿命にも注意。

◆static変数(static variables)

シングルスレッドのプログラムでは、static変数は、プログラムのモジュール 性(modularity) を高めるために有効に使われてきた。

マルチスレッドと相性が非常に悪い。static変数もextern変数と同様に複数の スレッドで共有される。変更する場合には、mutex でロックが必要になる。

◆static変数を使ったライブラリ関数/library functions that use static variables

TCP/IP でプログラムを書く時に使う gethostbyname() は、 static変数に値をセットして、その番地を返す。

struct hostent *gethostbyname( char *name ){
    static struct hostent ret ;
	.....
	return( &ret );
}

複数のスレッドが同時にこの関数を呼び出した場合、同じstatic変数が使われ る。

◆スレッド・セーフ(thread-safe)

複数のスレッドで呼び出してもきちんと動作することを、 スレッド・セーフ(thread-safe) という。 MT-Safe(multi-thread-safe)再入可能(reentrant) ということもある。

externやstaticを使わず、auto変数やmalloc()だけを使っているような手続き は、スレッド・セーフ。

別のスレッド・セーフでない手続きを呼んでいれば、それはスレッド・セーフ ではない。

◆スレッド・セーフなインタフェース(thread-safe interface)

スレッド・セーフになるようにするには、インタフェースを変更する必要があ る。
Sun のマニュアルより:
struct hostent *gethostbyname(const char *name);

struct hostent *gethostbyname_r(const char *name,
     struct hostent *result, char *buffer, int buflen,
     int *h_errnop);

◆スレッド・セーフではない手続きを使う/Using thread-unsafe functions in multithread programs

一見無関係の手続きが内部で変数を共有している場合がある。

■Pthreadでのセマフォの利用/Pthread and semaphore

Pthread には、実時間機能を実現することを目的として、セマフォが使えるよ うになっている。実時間以外の目的でも、セマフォを使ってもよい。

次のような関数が利用可能である。

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value)
初期化。psharedが0だとプロセス内で有効。valueは初期値。
int sem_wait(sem_t * sem)
P命令。値を減らす。0の場合は止まる。
int sem_trywait(sem_t * sem)
非ブロックのsem_wait()。0でも止まらずエラーを返す。
int sem_post(sem_t * sem)
V命令。値を増やす。
int sem_getvalue(sem_t * sem, int * sval)
現在の値を返す。普通は役には立たない。次の瞬間には他のスレッドがP/Vしているかもしれないので。
int sem_destroy(sem_t * sem)
セマフォを破棄する。
注意: SystemV 由来のセマフォ(semget(),semop(),semctl())とは違う。

注意:名前付きのセマフォもある。sem_open() で作成/初期化し、 sem_unlink() で削除する。

注意:Solaris には、POSIX のセマフォとは別に、カーネル内でのデバイス・ ドライバ作成のためのセマフォが用意されている。

■Javaのスレッド/Threads in Java

Java には、最初から言語のレベルでスレッドの機能が入っている。
Java Pthread
new Thread(); start(); pthread_create()
join() pthread_join()
synchronized pthread_mutex_lock()とpthread_mutex_unlock()の組
wait() pthread_cond_wait()
wait(long timeout) pthread_cond_timedwait()
notify() pthread_cond_signal()
notifyAll() pthread_cond_broadcast()

Java の synchronized は、再帰可能(recursive)。PTHREAD_MUTEX_RECURSIVE 相当。 1つのスレッドが2度同じオブジェクトをロックしてもよい。

Pthreads のプログラムで、1つの mutex と1つの条件変数を使ったものなら、 Java で簡単に書き直せる。

Pthreadでの有限バッファのプログラムは、1つの mutex で2つの条件変数を使っているので、単純には Java で書き直せない。 生産者側と消費者側が同時に待つことはないという性質を利用する。

Java で書かれたスレッドのプログラムは、汚いものがけっこうある。スレッ ド「間」の同期で、対称系になるべき所を、片方のスレッドのメソッドにして、 非対称になっていることがある。Java でプログラムを書く時にも、active object (thread) と passive object (threadなし)をきちんと分けた方がよい。

◆Concurrent Pascal と Java の比較(comparisons)

共通点 common ideas 相違点 differences

◆条件変数を使った有限バッファ(Java)

Concurrent Pascalのプログラムと同じ 手続き名と変数名(same procedure and variable names)
   1: 
   2: /*
   3:  * CircularBuffer.java -- Java による環状バッファ
   4:  */
   5: 
   6: class CircularBuffer
   7: {
   8:     static final int BUFFER_SIZE = 4 ;
   9:     int rp ;            // 読み出す位置
  10:     int wp ;            // 書き込む位置
  11:     int data[];         // データを保存する場所
  12:     int used ;          // バッファ内の要素数
  13:     CircularBuffer()
  14:     {
  15:         data = new int[BUFFER_SIZE];
  16:         rp = 0 ;
  17:         wp = 0 ;
  18:         used = 0;
  19:     }
  20: 
  21:     public synchronized void put( int x ) throws InterruptedException
  22:     {
  23:         while( used == data.length )
  24:             wait();
  25:         data[ wp++ ] = x ;
  26:         if( wp == data.length )
  27:             wp = 0 ;
  28:         used++ ;
  29:         notifyAll();
  30:     }
  31:     public synchronized int get() throws InterruptedException
  32:     {
  33:         int x ;
  34:         while( used == 0 )
  35:             wait();         
  36:         x = data[ rp++ ] ;
  37:         if( rp >= data.length )
  38:             rp = 0 ;
  39:         used--;
  40:         notifyAll();
  41:         return( x );
  42:     }
  43: }
   1: 
   2: /*
   3:  * CircularBufferDemo.java -- Java による有限バッファのデモ
   4:  */
   5: 
   6: class Thread_A implements Runnable // Producer
   7: {
   8:     CircularBuffer b;
   9:     Thread_A( CircularBuffer b )
  10:     {
  11:         this.b = b;
  12:     }
  13:     public void run()
  14:     {                           
  15:         int i,x ;
  16:         for( i = 0 ; i<10 ; i++ )
  17:         {
  18:             try
  19:             {
  20:                 x = i ;
  21:                 System.out.println("Thread_A(): put( "+x+" )");
  22:                 b.put( x );
  23:             }
  24:             catch( InterruptedException e )
  25:             {
  26:                 System.err.println("Thread_A(): Interrupted");
  27:                 break;
  28:             }
  29:         }
  30:     }
  31: }
  32: 
  33: class Thread_B implements Runnable // Producer
  34: {
  35:     CircularBuffer b;
  36:     Thread_B( CircularBuffer b )
  37:     {
  38:         this.b = b;
  39:     }
  40:     public void run()
  41:     {                           
  42:         int i,x ;
  43:         for( i = 0 ; i<10 ; i++ )
  44:         {
  45:             try
  46:             {
  47:                 x = b.get();
  48:                 System.out.println("Thread_B(): got() "+x+".");
  49:             }
  50:             catch( InterruptedException e )
  51:             {
  52:                 System.err.println("Thread_B(): Interrupted");
  53:                 break;
  54:             }
  55:         }
  56:     }
  57: }
  58: 
  59: class CircularBufferDemo
  60: {
  61:     public static void main(String argv[])
  62:     {
  63:         final CircularBuffer b = new CircularBuffer();
  64:         Thread t1 = new Thread( new Thread_A(b) );
  65:         t1.start();
  66:         Thread t2 = new Thread( new Thread_B(b) );
  67:         t2.start();
  68:         try
  69:         {
  70:             t1.join();
  71:             t2.join();
  72:         }
  73:         catch( InterruptedException e )
  74:         {
  75:             System.err.println("main(): Interrupted");
  76:         }
  77:     }
  78: }
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/CircularBuffer.java [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/CircularBufferDemo.java [←]
% javac -encoding EUC-JP CircularBuffer.java CircularBufferDemo.java  [←]
% java CircularBufferDemo  [←]
Thread_A(): put( 0 )
Thread_A(): put( 1 )
Thread_A(): put( 2 )
Thread_A(): put( 3 )
Thread_A(): put( 4 )
Thread_B(): got() 0.
Thread_B(): got() 1.
Thread_B(): got() 2.
Thread_B(): got() 3.
Thread_B(): got() 4.
Thread_A(): put( 5 )
Thread_A(): put( 6 )
Thread_B(): got() 5.
Thread_A(): put( 7 )
Thread_A(): put( 8 )
Thread_A(): put( 9 )
Thread_B(): got() 6.
Thread_B(): got() 7.
Thread_B(): got() 8.
Thread_B(): got() 9.
% []

◆BlockingQueue

Java 2 Standard Edition (J2SE) 5.0 で、 java.util.concurrent パッケージに interface BlockingQueue が追加された。 これを使えば、自分で有限バッファを記述する必要はない。 具体的には、ArrayBlockingQueue を使う。 上限を指定しない BlockingQueue もあるが、利用しないことを奨める。

◆java.util.concurrent.locks package

Java 2 Standard Edition (J2SE) 5.0 で、次のようなクラスが追加された。
Lock
Pthread の mutex 相当。 ブロックを越えてロックできる。
Condition
Pthread の条件変数(condition variable)相当。 Lock と合わせて使う。
ReadWriteLock
読書きロック。 Pthread pthread_rwlock_t 相当。後述。

◆Java のセマフォ/Semaphore in Java

Java 2 Standard Edition (J2SE) 5.0 には、 java.util.concurrent.Semaphore クラスがある。

■Rubyのスレッド/Threads in Ruby

Ruby には、標準でスレッドの機能が入っているが、標準ではGiant VM lock (GVL) を保持して実行されるので、並列に実行されない。ただし、入出力を行 うと GVL をリリースするので、CPU 処理と複数の入出力処理を重ね合わること はできる。Parallel 等の拡張ライブラリを使うと、CPU処理を並列に実行する こともできる。
Ruby Pthread
Thread.new() { } pthread_create()
Thread#join() pthread_join()
Mutex#synchronize {} pthread_mutex_lock(), pthread_mutex_unlock()
Mutex#lock() Mutex#unlock()
ConditionVariable#wait(mutex) pthread_cond_wait()
ConditionVariable#wait(mutex,timeout) pthread_cond_timedwait()
ConditionVariable#signal() pthread_cond_signal()
ConditionVariable#broadcast() pthread_cond_broadcast()

Ruby の Mutex は、再帰可能ではない。再帰可能なもの、 PTHREAD_MUTEX_RECURSIVE 相当のものが必要ならば、Mutex ではなく Monitor/Sync を使う。

Pthreads のプログラムを、Mutex/Monitor/Sync と ConditionVariable を使っ て同じロジックで Ruby のプログラムに書き直せる。

◆Concurrent Pascal と Ruby の比較(comparisons)

共通点 common ideas 相違点 differences

◆条件変数を使った有限バッファ(Ruby)

Concurrent Pascalのプログラムと同じ 手続き名と変数名(same procedure and variable names)
   1: #!/usr/bin/env ruby
   2: # -*- coding: utf-8 -*-
   3: # condv-buffer-ruby.rb -- Circular Buffer in Ruby
   4: 
   5: require 'thread'
   6: 
   7: class CircularBuffer
   8:     BUFFER_SIZE = 4
   9:     def initialize()
  10:         @data = Array.new(BUFFER_SIZE)
  11:         @rp = 0
  12:         @wp = 0
  13:         @used = 0
  14:         @mutex = Mutex.new()
  15:         @not_full  = ConditionVariable.new()
  16:         @not_empty = ConditionVariable.new()
  17:     end
  18:     def put( x )
  19:         @mutex.synchronize {
  20:             while( @used == @data.length() )
  21:                 @not_full.wait( @mutex )
  22:             end
  23:             @data[ @wp ] = x
  24:             @wp += 1
  25:             if( @wp >= BUFFER_SIZE )
  26:                 @wp = 0
  27:             end
  28:             @used += 1
  29:             @not_empty.broadcast()
  30:         }
  31:     end
  32:     def get()
  33:         @mutex.synchronize {
  34:             while( @used == 0 )
  35:                 @not_empty.wait( @mutex )
  36:             end
  37:             x = @data[ @rp ]
  38:             @rp += 1
  39:             if( @rp >= BUFFER_SIZE )
  40:                 @rp = 0
  41:             end
  42:             @used -= 1
  43:             @not_full.broadcast()
  44:             return( x )
  45:         }
  46:     end
  47: end
  48: 
  49: def main()
  50:         b = CircularBuffer.new()
  51:         t1 = Thread.new() {
  52:             thread_A( b )
  53:         }
  54:         t2 = Thread.new() {
  55:             thread_B( b )
  56:         }
  57:         t1.join()
  58:         t2.join()
  59: end
  60: 
  61: def thread_A( b ) # producer
  62:         i = 0
  63:         while( i < 10 )
  64:             x = i
  65:             printf("thread_A(): put( %d )\n",x )
  66:             b.put( x );
  67:             i += 1
  68:         end
  69: end
  70: 
  71: def thread_B( b ) # consumer
  72:         i = 0
  73:         while( i < 10 )
  74:             x = b.get()
  75:             printf("thread_B(): get() %d.\n", x );
  76:             i += 1
  77:         end
  78: end
  79: 
  80: main()
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2017/2017-10-13/ex/condv-buffer-ruby.rb [←]
% ruby condv-buffer-ruby.rb [←]
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_A(): put( 2 )
thread_A(): put( 3 )
thread_A(): put( 4 )
thread_B(): get() 0.
thread_B(): get() 1.
thread_B(): get() 2.
thread_B(): get() 3.
thread_A(): put( 5 )
thread_A(): put( 6 )
thread_A(): put( 7 )
thread_A(): put( 8 )
thread_B(): get() 4.
thread_A(): put( 9 )
thread_B(): get() 5.
thread_B(): get() 6.
thread_B(): get() 7.
thread_B(): get() 8.
thread_B(): get() 9.
% []

◆Thread::SizedQueue

Ruby の Thread::SizedQueue を使えば、自分で有限バッファを記述する必要は ない。キューの要素数の上限を指定しない Thread::Queue もあるが、利用しな いことを奨める。

◆Ruby parallel

Ruby gem にある parallel ライブラリは、fork() を使っているので重たいが、 並列処理はできる。

■課題(assignment)2 マルチスレッド・プログラミング(1)/ Multithread programming (1)

以下の問題の中で、次のものを選び、回答しなさい。 Choose one question and answer it from the following list: レポートには、次のものを含めなさい。 A report must include following. この内容を含む「テキスト」ファイルを作成し、 レポート提出ページ から提出しなさい。

締切りは、2017年10月18日、 23:59:59 とする。

★問題(201) スレッドの数/ number of threads

相互排除のプログラムや条件変数プログラムでスレッドの数を増やして実行し てみなさい。 The multithread programs using mutex and condition variables creates one or two new threads. Rewrite these programs that creates three or more threads and execute them.

★問題(202) 手続きとスレッド/ procedure and threads

1つの手続き(C言語の関数)から複数のスレッドを生成してみなさい。 Create multiple threads from a single procedure (function in C).

★問題(203) ダブルバッファリング/ double buffering

循環バッファのプログラムダブルバッファリングを行なうプログラムに 変更しなさい。 Rewrite the program of a bounded buffer to perform double buffering.

★問題(204) 条件変数の削減/ removing condition variables

循環バッファのプログラムで、条件変数を1つになるように変更しなさい。 一度に wait する必要があるのは、put() 側か get() のどちらか一方だけである。 よって、両方とも同じキューにつないでも、動作する。

The program of bounded buffer uses two condition variables. Rewrite this program to use a single condition variables. Since a thread can block either on put() or get(), we can share a single queue of a condition variable.

★問題(205) Java または Rubyにおける再帰的モニタの確認/ Recursive mutex in Java or Ruby

再帰的mutexの例題で使った Pthread のプログラムを Java または Ruby で書き直しなさい。 Rewrite the program of recursive mutex in C into the program in Java or Ruby.

★問題(206) 残高照会と引き出し機能の追加/Adding balance() and withdraw()

再帰的mutexの例題で使ったプ ログラムに、口座の残高を返す手続きと口座から現金を引き出す手続きを付け なさい。ただし、現金の引出しでは、残高が負にならないようにしなさい。 プログラミング言語としては、C、Java、または、Ruby を使いなさい。

Add following two functions to the program of recursive mutex . The first function returns the balance of the bank account. The second function withdraws the given amount of money from the bank account. If the bank account does not have sufficient money, this function should nothing. You may use the programming language C, Java, or Ruby.

int balance()
{
...
// return the balance 残高を返す
}
int withdraw( int n )
{
...
// return 1 if succeeded. 成功の時は 1 を返す
// return 0 if failed due to insufficient money. 残高不足の時は 0
}

プログラミング言語としては、C、Java、または、Ruby を使いなさい。 You may use the programming language C, Java, or Ruby.

★問題(207) 複数要素を受付ける有限バッファ/ multi-item bounded buffer

条件変数を使った有限バッファのプログラムのうち、 (C言語Pthread版) 、または、 (Java版) 、または、 (Ruby版) 、または、 を書き換えて、一度に n 個(n は可変)の要素を受付けるようにしなさい。 Rewrite the bounded buffer programs in (C with Pthread), (Java) or (Ruby) to take n items at one. すなわち、

作成した有限バッファに対して複数のスレッドがput(n,x[]) した時、x[] の要 素がインターリーブされることがあるか? たとえば、2つのスレッドが次のよう な操作をしたとする。 When multiple threads calls the functin put(n,x[]), can the items in each x[] be interleved or not? For example, consider the following actions. このデータをスレッド3で1個ずつ get() した時に、インターリーブされないな らば、必ず次のいずれかの結果になる。 Thread 3 takes items with get(). If the items are not interleaved, the results will be either of the following. インターリーブされたとすると、次のような結果が出てくることがある。 If the items are interleaved, the output can be as follows.

★問題(208) 循環バッファをセマフォで実現する/ bounded buffer with semaphores

循環バッファのプログラム ( C + Pthread Java , または Ruby )を、セマフォを使って書き直しなさい。

Rewrite the bounded buffer programs (in C with Pthread Java, or Ruby ) to use semaphores.

プログラミング言語としては、C、Java、または、Ruby を使いなさい。 You may use the programming language C, Java, or Ruby.

セマフォを使った循環バッファのプログラムの作成は、次の教科書の練習問 題にもなっている(巻末に回答もある)。 The following text book of operating systems contains exercise of a bounded buffer. The answer is also included in the end of the book.

清水謙多郎: "オペレーティングシステム",岩波書店 (1992). ISBN: 4000078526.

★問題(209) Javaのjava.util.concurrent.*の利用/ Using java.util.concurrent.* in Java

このページで示した例題、または、練習問題を、Java 言語で次のクラス、また はインタフェースを1つ以上用いて実装しなさい。 Use one or more following classes or interfaces and write a program in these exercises or examples in this page.
Last updated: 2017/10/13 13:00:11
Yasushi Shinjo / <yas@cs.tsukuba.ac.jp>