並行・並列・分散プログラミング、マルチスレッド・プログラミング/concurrent,parallel,and distributed programming,and Multithread programming

並行システム

                               システム情報系情報工学域,
			       システム情報工学研究科コンピュータサイエンス専攻
                               新城 靖
                               <yas@cs.tsukuba.ac.jp>

このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/cs/
http://www.cs.tsukuba.ac.jp/~yas/

■連絡事項

科目番号

  1. 01CH303: 一般コース
  2. 01CJ217: 高度IT専修コース

成績の付け方(前半)。次のものを合算する。

欠席した日についても、レポートを提出すること。 印刷物は、配布しない。必要なら自分で印刷しなさい。

■今日の重要な話

用語の意義 ハードウェア マルチスレッド・プログラミング 成績評価には、次のものを合算する。 欠席した日にレポートやクイズが出題されていれば、それも提出すること。 印刷物は、配布しない。必要なら自分で印刷しなさい。初回は特別に配布する。

■並行・並列・分散プログラミング(concurrent, parallel, and distributed programming)

◆逐次プログラミングと並行プログラミング(sequential and concurrent programming)

逐次 sequential
1度に1つの手続きが実行される
並行 concurrent
1度に複数の手続きが実行される
main()
{
	printf("hello, world!\n"); 
}
逐次プログラムでは、1度に1つの手続き(関数)(procedure(function))しか実行 されない。printf() が動くと main() は止まる。

複数の手続きを実行する主体は、「プロセス(process)」や「スレッド (thread)」。

縦軸時間、横軸空間、スレッドが協調している

図? 逐次プログラムと並行プログラムの動作(時間と空間の観点から)

並列処理と分散処理の目的

並列 parallel
1つの問題を(逐次処理より)速く(faster)解きたい
分散 distributed
逐次処理は非常に特殊。 並行プログラミング(並列、分散、その他)の共通の話 単一プロセッサでの並行プログラミングもある。後述。 concurrent programming in a single processor.

◆ソフトウェアの記述

2つの言語が必要
計算言語 computational languages
個々の活動を記述
協調言語 coordination languages
統一されたプログラムに組み立てるための「糊(glue)」
Java など、1つの言語で計算と協調が書けるものもある。

■ハードウェア / hardware

◆シングルプロセッサと集中型システム / single processors and centralized systems

シングルプロセッサは、1台のコンピュータにプロセッサが CPU が 1 つ、メ モリも 1つ。

ネットワークで接続された複数のコンピュータ(multiple computers that are connected with a network)から構成される分散システム(distributed systems)と対比する時には、集中型システム(centralized systems))と呼ばれ る。

技術的に確立している。

CPU、メモリ、I/O、OS、アプリケーション

図? 集中型システムのハードウェア (hardware of a centralized system)

◆マルチプログラミング multiprogramming in single processor

1台のシングルプロセッサのメモリに、同時に複数のプログラムをロードして切り替 えながら動作させる。

1950年代、60年代のハードウェアは、高い。

特に高いCPUを有効に活用したい。

バッチ処理の誕生。入出力とCPU処理を「並列」動作させる。

ジョブが3つ、入力、処理、出力、異なるものは重なってもよい。

図? バッチ処理における CPU と入出力装置の並列動作 (parallel processing of CPU and I/O in batch processeing)

プロセスの概念の確立。プロセスとプログラムの分離。 一般のプログラミングは、逐次のまま。

OSだけ並行プログラミング

今でもOSの教科書には、セマフォやデッドロックの話が出ている。

Concurrent Pascal, Solo。

OSの実装では、実際には、「割り込み禁止(disable interrupts)」という軽 量の相互排除命令が多用された。

◆TSS (Time sharing system)

Time sharing system。

1台の中央の大型コンピュータ(mainframe)に、複数の「(文字)端末 (terminal)」を接続。

端末を使っている人からすると専有しているように見える。

TSS で複数のアプリケーションを同時に走らせる。 CPU が貴重な時代。CPU を遊ばせない。

メインフレーム1台、端末n台

図? メインフレームと端末(a mainframe and terminals)

◆Virtual Machine

1台のメインフレームに、同時に複数のOSを走らせるのが始り。 TSS の実装方法の1つでもあった。

最近は、 コンピュータの高速化で、性能が余ってきた。もともとハードウェアn台でやっ ていた仕事を、ハードウェアとしては1台に集約する。並列処理の逆。

図? サーバ3台、OS3種類、アプリケーションそれぞれ2つ

図? 仮想計算機によるサーバの集約(集約前) (before server consolidation by virtual machines)

図? ハードウェア1台、OS3種類、アプリケーションそれぞれ2つ

図? 仮想計算機によるサーバの集約(集約後) (after server consolidation by virtual machines)

1999年VMware Workstation 以降、パソコン用の VM も広く使われる。

◆並列コンピュータ(parallel computers)

Flynn の分類 (Flynn's taxonomy)

MIMD の分類

Machシステムでの分類 ( Mach operating system kernel, Mach microkernel )

データフローコンピュータ(data flow computer)。関数型プログラミング (functional programming)。論理型プログラミング(logic programming)。第五 世代コンピュータ(the fifth generation computer)。

逐次に対して、どうやって並列性(parallelism)を抽出するかが主眼。 逐次プログラムでは、マルチプロセッサで走らせても1倍。

特定用途のマルチプロセッサ(special purpose parallel computer)。

スパコン(super computers)。ベクトルコンピュータ(vector computers)。

Occam/Transputer。CSPの実装。

MMU (memory management unit)の活用。分散共有メモリ(distributed shared memory, distributed virtual memory)。

◆共有メモリ型マルチプロセッサ、SMP (Shared memory multiprocessor)

(CPU+Cache)*n+メ
モリ

図? 共有メモリ型マルチプロセッサ(バス共有) (Shared memory multiprocessor (with a shared bus))

◆クロスバースイッチ(crossbar switch)

n*CPU-n*Memory

図? 共有メモリ型マルチプロセッサ(クロスバースイッチ) (shared memory multiprocessor with a crossbar switch)

複数のCPUが別のメモリに同時にアクセスできる。 同じメモリならバスと同様に衝突する。

◆共有メモリ型マルチプロセッサとスレッド(shared memory multiprocessor and threads)

共有メモリ型マルチプロセッサをうまく使いたいが、fork() は、重たい。 軽いプロセス(lightweight processes)、スレッド(threads)の研究。

OSは、ジャイアント・ロック(giant lock)。

◆非均質共有メモリ型マルチプロセッサ(shared memory multiprocessor with non-uniform memory access)

CPU、メモリ、相互結合ネットワーク

図? 非均質共有メモリ型マルチプロセッサ (shared memory multiprocessor with non-uniform memory access)

相互結合ネットワークで、遠隔のメモリを CPU から直接アドレスでアクセス できる。ただし、速度は 100 倍くらい遅い。

◆LANに接続されたコンピュータ群(multiple computers connected to a LAN)

PC*3--hub

図? LANに接続されたPC

遠隔のメモリは、アクセスできない(no remote memory access)。 機種が違う(heterogeneous)こともある。

■スレッド・プログラミング / multithread programming

◆スレッドとは / What is a thread

スレッド(thread) あるいは、 軽量プロセス(lightweight processes) とは、 1つの保護の単位(unit of protection)としての プロセス(タスク,あるいは,アドレス空間; a process, task, or address space) 内部にふくまれている平行処理の単位(unit of concurrent processing) (論理的な並列処理の単位(unit of logical parallel processing))。

シングルスレッドのプログラム a single thread program
1度に1つの手続き(Cの関数)しか動かない。 runs one function at the same time
マルチスレッドのプログラム a multithread program
1度にスレッドの数だけの手続きが論理的には同時に動く。 runs two or more functions at the same time (同期でブロックされているものも含む)

図? シングルスレッドのプロセスとマルチスレッドのプロセス

図? シングルスレッドのプロセスとマルチスレッドのプロセス / A process with a single thread and a process with multiple threads

軽量プロセスというと、内部にループを含むような語感がある。 A lightweight process can contain a loop in it.

◆スレッドの利用目的/ purpose of multithread programing

◆本当にスレッドが必要か/"Why Threads Are A Bad Idea (for most purposes)

John K. Ousterhout, "Why Threads Are A Bad Idea (for most purposes)", Invited Talk at the 1996 USENIX Technical Conference (January 25, 1996). [PDF]

マルチスレッドプログラミングは、非常に難しい。

シングルスレッドのイベント駆動で書けるなら、その方がいい。 GUI、分散など。

どこでスレッドを使うべきか。

◆スレッドの内容/thread contents

個々のスレッドごとに持つもの each thread has プロセス全体で共有 shared among threads in a process

◆スレッドの操作 thread operations

◆Unix のユーザ・レベルの(重たい)プロセスの操作/ operations of heavyweight processes in Unix

Unixカーネル内の並行処理/ concurrent processing in a Unix kernel

PDP-11 時代からのモデル

■Concurrent Pascal

逐次型プログラミング言語 Pascal を拡張したもの。 Extension of a sequential programming language, Pascal.

◆参考文献 references

Per Brinch Hansen: "The Architecture of Concurrent Program", Prentice Hall (1977).

Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).

Per Brinch Hansen: "The programming language Concurrent Pascal," IEEE Transactions on Software Engineering, Vol.SE-1, No.2, pp.199-207, June 1975.

◆プロセス process

プロセスは、データ型(data type)として定義。 変数がインスタンス。内部に無限ループ(infinite loop)
type procA_t = process(args...);
   var
      local variables...
   procedure proc1(args...);
   procedure proc2(args...);
begin
   cycle
       ...
   end;
end

var procA1 : procA_t ;
init procA1(args...);

◆モニタ monitor

モニタは、プロセス間で共有されるデータ構造。 a monitor is a shared data structure among processes.
type monA_t = monitor(args ・・・);
   var
      loal variables
   procedure entry proc1(args ・・・);
   procedure entry proc2(args ・・・);
begin
   initialization of local variables;
end

var monA1 : monA_t ;
init monA1(引数);
ローカル変数は、entry の手続きでしかアクセスできない。 1 つのプロセスが entry の手続きを実行中は、他のプロセスは入れない(mutual exclusion)。

図? プロセス3個、モニタ1個、entry手続き3個、局所変数

図? Concurrent Pascalのプロセスとモニタ/Processes and a monitor in Concurrent Pascal

◆条件変数とキュー condition variable and queue

  cv1 : condition;
  cv1.wait;      呼び出したプロセスを待たせる。block the current process.
  cv1.signal;   待っているプロセスがいれば全て再開させる。unblock all waiting processes.
  q1 : queue;
  delay(q1);      呼び出したプロセスをそのキューで待たせる。
                  block the current process.
  continue(q1);   そのキューで待っているプロセスがいれば1つだけ再開させる。
                  unblock a single process in the queue.
最近の言語(Pthread, Java 等)では、Concurrent Pascal の condition variable もqueue も、ともに条件変数。

◆有限バッファ bounded buffer

Unix のパイプ(pipe)のようなことを Concurrent Pascal のプロセス(スレッド)を使って実行したい。
$ producer | consumer [←]
2つのスレッドの間には、バッファを置く。

図? 環状バッファ(有限バッファ)、生産者、消費者

図? 有限バッファ(環状バッファ)、生産者、消費者 / bounded buffer (circular buffer), producer, consumer

バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。

手続き procedures

定数と変数 Constants and variables
   1: const BUFFER_SIZE = 4;
   2: type circular_buffer =
   3: monitor
   4: var
   5:     rp : integer ;
   6:     wp : integer ;
   7:     data: array [0..BUFFER_SIZE-1] of integer;
   8:     used: integer;
   9:     not_empty : condition;
  10:     not_full  : condition;
  11: 
  12:     procedure entry put(x:integer);
  13:     begin
  14:         while( used = BUFFER_SIZE ) do
  15:             non_full.wait;
  16:         data[wp] := x;
  17:         wp := wp + 1 ;
  18:         if( wp >= BUFFER_SIZE )
  19:             wp := 0 ;
  20:         used := used + 1 ;
  21:         not_empty.signal;
  22:     end
  23: 
  24:     procedure entry get(result x:integer);
  25:     begin
  26:         while( used = 0 ) then
  27:             not_empty.wait;
  28:         x := data[rp];
  29:         rp := rp + 1 ;
  30:         if( rp >= BUFFER_SIZE )
  31:             rp := 0 ;
  32:         used := used - 1 ;
  33:         not_full.signal;
  34:     end
  35: begin
  36:     rp := 0 ;
  37:     wp := 0 ;
  38:     used := 0 ;
  39: end;
  40: 
  41: ...
  42: var buf : circular_buffer ;
  43: init buf;
  44: ...
  45: 

■Pthread

Pthread は、POSIX 1003.1c-1995という標準に準拠したスレッド・ライブラリ。 POSIX Thread とも呼ばれる。

◆Pthreadを利用したプログラムのコンパイル/compiling pthread programs

Pthread を利用したプログラムを書く時には、次のようなヘッダ・ファイルを 読み込む。

#include <pthread.h>

Linux, MacOSX, でのコンパイルとリンク。-lpthread を付ける。

% cc -o create create.c -lpthread [←]
% ./create [←]
...
% []

Solaris (Unix International系)でのコンパイルとリンク。 -D_REENTRANT-lpthread を付ける。

% cc -D_REENTRANT -o create create.c -lpthread [←]
% ./create [←]
...
% []
セマフォを使う時、Solaris 6 (SunOS 5.6) では、リンク時に -lposix4 オプションを付ける。Solaris 7-10 (SunOS 5.7, 5.8, 5.9, 5.10) では、リンク時に -lrt オプションを付ける。

■Pthreadにおけるスレッドの生成・消滅/creation and termination of threads in Pthread

スレッドは、普通のプログラムの、 サブルーチン(C言語の関数、手続き; subroutine, C functions, procedures) に近い。 サブルーチンの場合,呼び出すと、呼び出された方が動き、自分自身は,止ま る。スレッドでは,新たにスレッドを生成した場合,生成した方と生成さ れた方は,論理的には2つとも同時に動く。

◆fork-joinモデル/fork-join model

図? fork-joinモデルの実現

図? fork-joinモデルの実現

  1. 逐次処理(スレッド/プロセスが1つ)の状態から始まる start with sequential processing
  2. 並列性が必要になった時、fork命令で複数のスレッド/プロセスに分か れて並列処理を行う。 fork when parallel processing is needed.
  3. 並列に動作できる部分が終ると join 命令で再び逐次処理に戻る。 join after parallel processing ends.

◆Unixのfork()/fork() in Unix

fork() システムコールでコピーが作られる。 join の代わりに、子どもは exit()、親は wait()。

◆Pthreadはcreate()/create() in Pthread

Pthread では、コピーではなく create で新たにスレッドを作る。同じ関数を 実行したい時には、直接 call する。(別の関数を実行するなら呼ばなくても よい。) 子スレッドでは、pthread_exit() (トップの 手続きからリターン)、 親は、pthread_join() する。

後で join する必要がない時には、pthread_detach() を使って切り離す。 (joinしなくてもゾンビが残らない。)

◆スレッドの生成とjoin/thread creation and join

This program creates two new threads.
   1: 
   2: /*
   3:         create-join-2.c -- スレッドを2つ作るプログラム
   4: */
   5: 
   6: #include <stdlib.h>     /* malloc() */
   7: #include <stdio.h> /* printf() */
   8: #include <pthread.h>
   9: 
  10: struct func1_arg {
  11:         int     x;
  12: };
  13: struct func2_arg {
  14:         int     x;
  15: };
  16: 
  17: void func1( struct func1_arg *arg );
  18: void func2( struct func2_arg *arg );
  19: 
  20: main()
  21: {
  22:     pthread_t t1 ;
  23:     pthread_t t2 ;
  24:     struct func1_arg *arg1;
  25:     struct func2_arg *arg2;
  26:     
  27:         printf("main()\n");
  28:         arg1 = malloc( sizeof(struct func1_arg) );
  29:         if( arg1 == NULL )
  30:         {
  31:             perror("no memory for arg1");
  32:             exit( 1 );
  33:         }
  34:         arg1->x = 10;
  35:         pthread_create( &t1, NULL, (void *)func1, (void *)arg1 );
  36:         arg2 = malloc( sizeof(struct func2_arg) );
  37:         if( arg2 == NULL )
  38:         {
  39:             perror("no memory for arg2");
  40:             exit( 1 );
  41:         }
  42:         arg2->x = 20;
  43:         pthread_create( &t2, NULL, (void *)func2, (void *)arg2 );
  44:         pthread_join( t1, NULL );
  45:         pthread_join( t2, NULL );
  46: }
  47: 
  48: void func1( struct func1_arg *arg )
  49: {
  50:     int i ;
  51:         for( i = 0 ; i<3 ; i++ )
  52:         {
  53:             printf("func1( %d ): %d \n",arg->x, i );
  54:         }
  55: }
  56: 
  57: void func2( struct func2_arg *arg  )
  58: {
  59:     int i ;
  60:         for( i = 0 ; i<3 ; i++ )
  61:         {
  62:             printf("func2( %d ): %d \n",arg->x, i );
  63:         }
  64: }
実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/create-join-2.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/Makefile [←]
% make create-join-2 [←]
gcc    -c -o create-join-2.o create-join-2.c
gcc create-join-2.o -lpthread -o create-join-2
% ./create-join-2  [←]
main()
func1( 10 ): 0 
func2( 20 ): 0 
func1( 10 ): 1 
func2( 20 ): 1 
func1( 10 ): 2 
func2( 20 ): 2 
% ./create-join-2 [←]
main()
func1( 10 ): 0 
func1( 10 ): 1 
func1( 10 ): 2 
func2( 20 ): 0 
func2( 20 ): 1 
func2( 20 ): 2 
% ./create-join-2 [←]
main()
func1( 10 ): 0 
func1( 10 ): 1 
func1( 10 ): 2 
func2( 20 ): 0 
func2( 20 ): 1 
func2( 20 ): 2 
% []
この例では、次の3つのスレッドが作られる。Three threads were created.
  1. main を実行しているスレッド
  2. func1 から作られたスレッド t1
  3. func2 から作られたスレッド t2
マルチスレッド・プログラミングでは、main関数もまた1つのスレッドが実行 していると考える。これを 初期スレッド(initial thread) 、あるいは、 メインスレッド(main thread) とよぶ。PThread では、 メインスレッド以外のスレッドは、 pthread_create() により作られる。

どういう順序で実行されるかは、決まっていない(nondeterministic)。 スレッドは、もともと順番を決めないような処理、 非同期的(asynchronous) な処理を表現するためのもの。どうしても順序を決めたければ、 mutex や条件変数といった同期機能を使う。

pthread_create()で指定された関数からリターンすると、そ のスレッドが終了する。pthread_exit() を呼び出してもよい。 ただし、 初期スレッド が終了すると、プロセス全体が終了する。 exit() システムコールを呼び出しても終了する。

■Pthread mutexによるスレッド間の同期/inter-thread synchronization with mutex in Pthread

◆共有資源/shared resource

複数のプロセス/スレッドで共有しなければならないもの。

プロセスの場合、ファイル、端末、ウインドウ、主記憶、CPU時間。

主記憶やCPU時間といった資源は、横取り(preemption)しても平気なので、あま り問題になりない。

スレッドの場合は、プログラミング言語の変数(variables)。

変更されない変数(immutable variables)の値を読む(read-only)だけなら、特 に問題は起きない。それ意外の時、特に、複数のスレッドで値を読み書きする 時に問題が起きる。

◆複数のスレッドによる共有資源のアクセス(ロックなし、バグ入り)/accessing a shared resource by multiple threads without locking (with a bug)

In this program, two threads access a shared resource without locking.
   1: 
   2: /*
   3:  * mutex-nolock.c -- 共有資源をロックなしでアクセスするプログラム
   4:  */
   5: 
   6: #include <stdio.h>
   7: #include <pthread.h>
   8: 
   9: void thread_A(), thread_B();
  10: int     shared_resource ;
  11: 
  12: main() {
  13:     pthread_t t1 ;
  14:     pthread_t t2 ;
  15:         shared_resource = 0 ;
  16:         pthread_setconcurrency( 2 );
  17:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  18:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  19:         pthread_join( t1, NULL );
  20:         pthread_join( t2, NULL );
  21:         printf("main(): shared_resource == %d\n", shared_resource );
  22: }
  23: 
  24: void thread_A()
  25: {
  26:     int i, x ;
  27:         for( i = 0 ; i<1000000 ; i++ )
  28:         {
  29:             x = shared_resource ;
  30:             x = x + 1 ;
  31:             shared_resource = x ;
  32:         }
  33: }
  34: 
  35: void thread_B() {
  36:     int i, x ;
  37:         for( i = 0 ; i<1000000 ; i++ ) {
  38:             x = shared_resource ;
  39:             x = x + 1 ;
  40:             shared_resource = x ;
  41:         }
  42: }

pthread_setconcurrency() は、利用したい CPU 数をシステムに要求するもの。

◆実行結果(ロックなし)/execution results without locking

SMP の Solaris, MacOSX での実行結果。 Execution results on an SMP with Solaris or Mac OS X.
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/mutex-nolock.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/Makefile [←]
% make mutex-nolock [←]
gcc    -c -o mutex-nolock.o mutex-nolock.c
gcc mutex-nolock.o -lpthread -o mutex-nolock
% ./mutex-nolock  [←]
main(): shared_resource == 1014838
% ./mutex-nolock [←]
main(): shared_resource == 1062224
% ./mutex-nolock [←]
main(): shared_resource == 1096882
% ./mutex-nolock [←]
main(): shared_resource == 1091488
% ./mutex-nolock [←]
main(): shared_resource == 1096291
% []
thread_A(), thread_B()と2つのスレッドが作 られている。整数型の変数 shared_resource は、その2つの スレッドの両方からアクセスされる。2つのスレッドで合計2000000 増える予 定だが、実際に実行すると、そうはならない。実行する度に答えが違う。

単一プロセッサの Linux 等では、何度か実行しても常に 2000000 だけ増える ことがある。

◆考察(ロックなし)/consideration

このプログラムが SMP で動いているとして、動きを 考えてえる。

thread_A()			thread_B()
	:				:
	:				:
29: x = shared_resource ;
				38: x = shared_resource ;
30: x = x + 1 ;
				39: x = x + 1 ;
31: shared_resource = x ;
				40: shared_resource = x ;
	:				:
	:				:
shared_resource++と書いてもだめ。複数の機械語命令に分割 されるので。

◆Pthreadでの相互排除/mutual exclusion in Pthread

Pthread では、相互排除を行なうために、 mutex という仕組みがある。次のようにして、相互排除を行ないたい部分を lock と unlock で囲む。

    pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
	:
	:
    pthread_mutex_lock( &murex1 );
        <際どい部分。critical section.>
    pthread_mutex_unlock( &mutex1 );

◆相互排除(mutual exclusion)

相互排除(mutual exclusion): ある資源をアクセスできるスレッドの数を 多くても1つ(at most one)にする。

プログラムの字面上、相互排除が必要な部分を 際どい部分(critical section) ( クリティカルセクション ) という。

◆複数のスレッドによる共有資源のアクセス(ロック付き)/accessing a shared resource by multiple threads with locking

ロックなしのプログラムを、mutex を使っ て書き直したもの。
   1: /*
   2:  * mutex-lock.c -- 共有資源をロックしながらアクセスするプログラム
   3:  */
   4: 
   5: #include <stdio.h>
   6: #include <pthread.h>
   7: 
   8: void thread_A(), thread_B();
   9: int     shared_resource ;
  10: pthread_mutex_t mutex1 ;
  11: 
  12: main() {
  13:     pthread_t t1 ;
  14:     pthread_t t2 ;
  15:         shared_resource = 0 ;
  16:         pthread_mutex_init( &mutex1, NULL );
  17:         pthread_setconcurrency( 2 );
  18:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  19:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  20:         pthread_join( t1, NULL );
  21:         pthread_join( t2, NULL );
  22:         printf("main(): shared_resource == %d\n", shared_resource );
  23: }
  24: 
  25: void thread_A()
  26: {
  27:     int i, x ;
  28:         for( i = 0 ; i<1000000 ; i++ )
  29:         {
  30:             pthread_mutex_lock( &mutex1 );
  31:             x = shared_resource ;
  32:             x = x + 1 ;
  33:             shared_resource = x ;
  34:             pthread_mutex_unlock( &mutex1 );
  35:         }
  36: }
  37: 
  38: void thread_B() {
  39:     int i, x ;
  40:         for( i = 0 ; i<1000000 ; i++ ) {
  41:             pthread_mutex_lock( &mutex1 );
  42:             x = shared_resource ;
  43:             x = x + 1 ;
  44:             shared_resource = x ;
  45:             pthread_mutex_unlock( &mutex1 );
  46:         }
  47: }

◆実行結果(ロック付き)execution results with locking

% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/mutex-lock.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/Makefile [←]
% make mutex-lock [←]
gcc    -c -o mutex-lock.o mutex-lock.c
gcc mutex-lock.o -lpthread -o mutex-lock
% ./mutex-lock [←]
main(): shared_resource == 2000000
% ./mutex-lock [←]
main(): shared_resource == 2000000
% ./mutex-lock [←]
main(): shared_resource == 2000000
% []

◆考察(ロック付き)/consideration

このプログラムが SMP で動いているとして、動きを 考えてえる。
thread_A()				thread_B()

	:					:
30: pthread_mutex_lock( &mutex1 );		:
					41: pthread_mutex_lock( &mutex1 );
31: x = shared_resource ;		<他のスレッドが実行中なので
32: x = x + 1 ;				 この状態でしばらく待たされる>
33: shared_resource = x ;			:
34: pthread_mutex_unlock( &mutex1 );		:
	:				<実行再開>
	:				42: x = shared_resource ;
					43: x = x + 1 ;
					44: shared_resource = x ;
					45: pthread_mutex_unlock( &mutex1 );
後から来た thread_B() は、他のスレッドが実行中は、待たさ れる。

■Pthread 条件変数によるスレッド間の同期/Inter-thread synchronization in Pthread

スレッドでプログラムを作っていると、あるスレッドが別のスレッドの仕事の 完了を待つ必要が出がある。

◆Pthteadでの有限バッファ/bounded buffer in Pthread

Unix のパイプのようなことをスレッドを使って実行したい。
thread_A | thread_B
2つのスレッドの間には、 有限バッファ(bounded buffer)を置く。

バッファが空の時、thread_B() は、thread_A() が何かデータをバッファに入 れるのを待つ。バッファがいっぱいの時、thread_A() は、thread_B() がバッ ファから何かデータを取り出すのを待つ。

◆条件変数(condition variable)

条件変数(condition variable) で、ある条件が生じたことを待つ。

条件変数の操作 operations:

wait
ある条件が満たされるまで待つ wait for a condition
broadcast
ある条件が満たされたことを伝える。待っているスレッドが全て起き上がる。 Raise a condition. Unblocks all threads currently blocked on the specified condition variable.
signal
ある条件が満たされたことを伝える。待っているスレッドが1つだけ起き上がる。 Raise a condition. Unblocks at least one thread currently blocked on the specified condition variable.

◆条件変数を使った有限バッファ(bounded buffer with condition variables)

Concurrent Pascalのプログラムと同じ 手続き名と変数名(same procedure and variable names)
   1: 
   2: /*
   3:  * condv-buffer.c -- 条件変数を使った有限バッファ
   4:  */
   5: 
   6: #include <stdio.h>      /* printf() */
   7: #include <stdlib.h>     /* malloc(), exit() */
   8: #include <pthread.h>
   9: 
  10: #define BUFFER_SIZE     4               /* バッファの大きさ */
  11: struct circular_buffer
  12: {
  13:         int rp ;                        /* 読み出す位置 */
  14:         int wp ;                        /* 書き込む位置 */
  15:         int used ;                      /* バッファ内の要素数 */
  16:         int data[BUFFER_SIZE];          /* データを保存する場所 */
  17:         pthread_mutex_t mutex ;         /* この構造体の相互排除のための mutex */
  18:         pthread_cond_t  not_full ;      /* バッファが一杯ではない状態を待つための条件変数 */
  19:         pthread_cond_t  not_empty ;     /* バッファが空ではない状態を待つための条件変数 */
  20: };
  21: 
  22: void thread_A(struct circular_buffer *b);
  23: void thread_B(struct circular_buffer *b);
  24: 
  25: void put( struct circular_buffer *b,int x )
  26: {
  27:         pthread_mutex_lock( &b->mutex );
  28: loop:   if( b->used == BUFFER_SIZE )
  29:         {
  30:             pthread_cond_wait( &b->not_full,&b->mutex );
  31:             goto loop;
  32:         }
  33:         b->data[ b->wp++ ] = x ;
  34:         if( b->wp >= BUFFER_SIZE )
  35:             b->wp = 0 ;
  36:         b->used ++ ;
  37:         pthread_cond_broadcast( &b->not_empty );
  38:         pthread_mutex_unlock( &b->mutex );
  39: }
  40: 
  41: int get( struct circular_buffer *b )
  42: {
  43:     int x ;
  44:         pthread_mutex_lock( &b->mutex );
  45: loop:   if( b->used == 0 )
  46:         {
  47:             pthread_cond_wait( &b->not_empty,&b->mutex );
  48:             goto loop;
  49:         }
  50:         x = b->data[ b->rp++ ] ;
  51:         if( b->rp >= BUFFER_SIZE )
  52:             b->rp = 0 ;
  53:         b->used -- ;
  54:         pthread_cond_broadcast( &b->not_full );
  55:         pthread_mutex_unlock( &b->mutex );
  56:         return( x );
  57: }
  58: 
  59: main()
  60: {
  61:     pthread_t t1 ;
  62:     pthread_t t2 ;
  63:     struct circular_buffer *b  ;
  64:         b = (struct circular_buffer *)malloc(sizeof(struct circular_buffer));
  65:         if( b == NULL )
  66:         {
  67:             perror("no memory for struct buffer\n");
  68:             exit( -1 );
  69:         }
  70:         b->rp = 0 ;
  71:         b->wp = 0 ;
  72:         b->used = 0 ;
  73:         pthread_mutex_init( &b->mutex, NULL );
  74:         pthread_cond_init( &b->not_full,NULL );
  75:         pthread_cond_init( &b->not_empty,NULL );
  76:         pthread_setconcurrency( 2 );
  77:         pthread_create( &t1, NULL, (void *)thread_A, (void *)b );
  78:         pthread_create( &t2, NULL, (void *)thread_B, (void *)b );
  79:         pthread_join( t1, NULL );
  80:         pthread_join( t2, NULL );
  81: }
  82: 
  83: void thread_A( struct circular_buffer *b )      /* producer */
  84: {
  85:     int i,x ;
  86:         for( i = 0 ; i<10 ; i++ )
  87:         {
  88:             x = i ;
  89:             printf("thread_A(): put( %d )\n",x );
  90:             put( b,x );
  91:         }
  92: }
  93: 
  94: void thread_B( struct circular_buffer *b )      /* consumer */
  95: {
  96:     int i, x ;
  97:         for( i = 0 ; i<10 ; i++ )
  98:         {
  99:             x = get( b );
 100:             printf("thread_B(): get() %d.\n", x );
 101:         }
 102: }

put() は、バッファにデータを追加する時に使う手続き。

基本的には、入口で pthread_mutex_lock() し、 出口で pthread_mutex_unlock() する。

バッファが一杯の時には、条件変数b->not_full で、 一杯でないという条件になるまで待つ。

待っている間は、mutex のロックは解除される。

pthread_cond_wait() からリターンして来る時には、もう一度 ロックされた状態に戻る。 pthread_cond_wait() でスリープしてある間に、 変数 (rp,wp,data)が書き換えられている可能性があるので、 もう一度最初から調べる。

get() は、バッファからデータを取り出す時に使う手 続き。put()とほぼ対称形。バッファが空の時に、wait し、バッファがもはや一杯ではないことをbroadcast する。

thread_A() は、10回バッファにデータを書き込むスレッド。 thread_B() は逆に、10回バッファからデータを読み出すス レッド。

◆実行結果 execution results

% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/condv-buffer.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/Makefile [←]
% make condv-buffer [←]
gcc    -c -o condv-buffer.o condv-buffer.c
gcc condv-buffer.o -lpthread -o condv-buffer
% ./condv-buffer  [←]
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_A(): put( 2 )
thread_A(): put( 3 )
thread_A(): put( 4 )
thread_B(): get() 0.
thread_B(): get() 1.
thread_B(): get() 2.
thread_B(): get() 3.
thread_A(): put( 5 )
thread_A(): put( 6 )
thread_A(): put( 7 )
thread_A(): put( 8 )
thread_B(): get() 4.
thread_B(): get() 5.
thread_B(): get() 6.
thread_B(): get() 7.
thread_A(): put( 9 )
thread_B(): get() 8.
thread_B(): get() 9.
% []
複数のスレッドが同時に動いている。バッファにためられるのは、最大4なの に、put() が 5 回連続して成功しているように見える。printf() の順番と put(), get() の順番は違うことがある。

◆Concurrent Pascal と Pthread の比較 / comparison

共通点 common ideas 相違点 differences

◆種村流/Tanemura Style

    pthread_mutex_lock( &mutex );
    while( 1 )
    {
        if( condition_checking() )
            pthread_cond_wait( &cv, &mutex );
	perform();
    }

◆signal or broadcastか

バッファに要素を「1つずつ」追加しているので、 pthread_cond_signal() でもよい。 pthread_cond_broadcast() に変えても動くようにプログラムを 作る。

pthread_cond_wait() で待っている間に条件が変わっているかもしれないので、 最初から調べ直す。signal で1人だけしか起き上がらないと仮定してはいけ ない。

「1つずつ」ではなく、複数個同時に読み書きする時には、 pthread_cond_broadcast() でないとだめ。

迷った時には、pthread_cond_broadcast()

◆ダブルバッファリング(double buffering)

整数を1つバッファに書き込むだけでロック/アンロックを行なっていると、 実際の並列処理では重たい。ロックの回数を減らすために、ダブルバッファリ ングと呼ばれる技術がよく使われる。読み手と書き手で別々にバッファをもう け、1つのバッファの処理をしている間は、ロックを行なわない。

■再帰的 mutex (recursive mutex)

1つのスレッドで1つの mutex を複数回ロックしたい。

開いたモジュールでは一度外にだたスレッドがもう一度入ってくる。

図? 開いたモジュールと閉じたモジュール

◆標準mutexでのデッドロック/deadlock with a normal mutex

export している関数の入口で lock, 出口 unlockを入れて、 スレッド・セーフなモジュールを作りたい。 export している関数が、他の export している関数を呼び出すと、デッドロッ クになる。
   1: /*
   2:  * mutex-reclock-normal.c -- 通常の mutex を使う例(デッドロック)
   3:  */
   4: 
   5: #include <stdio.h>      /* printf() */
   6: #include <pthread.h>
   7: 
   8: void thread_A(), thread_B();
   9: int     shared_resource ;
  10: pthread_mutex_t mutex1 ;
  11: 
  12: deposit( int n )
  13: {
  14:         pthread_mutex_lock( &mutex1 );
  15:         shared_resource += n ;
  16:         pthread_mutex_unlock( &mutex1 );
  17: }
  18: 
  19: add_interest()
  20: {
  21:     int i ;
  22:         pthread_mutex_lock( &mutex1 );
  23:         i = shared_resource * 0.05 ;
  24:         deposit( i );
  25:         pthread_mutex_unlock( &mutex1 );
  26: }
  27: 
  28: main() {
  29:     pthread_t t1 ;
  30:     pthread_t t2 ;
  31:         shared_resource = 1000000 ;
  32:         pthread_mutex_init( &mutex1, NULL );
  33: 
  34:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  35:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  36:         pthread_join( t1, NULL );
  37:         pthread_join( t2, NULL );
  38:         printf("main(): shared_resource == %d\n", shared_resource );
  39: }
  40: 
  41: void thread_A()
  42: {
  43:         printf("thread_A(): deposit( 10000 ) ... \n");
  44:         deposit( 10000 );       
  45:         printf("thread_A(): deposit( 10000 ) done. \n");
  46: }
  47: 
  48: void thread_B()
  49: {
  50:         printf("thread_B(): add_interest() ... \n");
  51:         add_interest();
  52:         printf("thread_B(): add_interest() done. \n");
  53: }
実行例。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/mutex-reclock-normal.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/Makefile [←]
% make mutex-reclock-normal [←]
gcc    -c -o mutex-reclock-normal.o mutex-reclock-normal.c
gcc mutex-reclock-normal.o -lpthread -o mutex-reclock-normal
% ./mutex-reclock-normal  [←]
thread_A(): deposit( 10000 ) ... 
thread_B(): add_interest() ... 
thread_A(): deposit( 10000 ) done. 
^C (強制終了)
% []
% []

注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、Makefile の次の行をコメントアウトしなさい。

#CFLAGS=	-DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP
この結果、次のように -D が付いた形でコンパイルされる。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c [←]

◆再帰的mutex/with recursive mutex

   1: /*
   2:  * mutex-reclock-recursive.c -- 再帰的 mutex を使う例
   3:  */
...
  28: static int
  29: my_pthread_mutex_init_recursive( pthread_mutex_t *mutex )
  30: {
  31:     pthread_mutexattr_t attr ;
  32:     int err ;
  33:         if( (err=pthread_mutexattr_init( &attr )) < 0 )
  34:             return( 0 );
  35:         if( (err=pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE)) <0 )
  36:             return( 0 );
  37:         err = pthread_mutex_init( mutex,&attr );
  38:         return( err );
  39: }
  40: 
  41: main()
  42: {
  43:     pthread_t t1 ;
  44:     pthread_t t2 ;
  45:         shared_resource = 1000000 ;
  46:         my_pthread_mutex_init_recursive( &mutex1 );
  47: 
  48:         pthread_create( &t1, NULL, (void *)thread_A, 0 );
  49:         pthread_create( &t2, NULL, (void *)thread_B, 0 );
  50:         pthread_join( t1, NULL );
  51:         pthread_join( t2, NULL );
  52:         printf("main(): shared_resource == %d\n", shared_resource );
  53: }
実行例。deposit() と add_interest() のタイミングによっては、最終結果は 違うことがある。
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/mutex-reclock-recursive.c [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/Makefile [←]
% make mutex-reclock-recursive [←]
gcc    -c -o mutex-reclock-recursive.o mutex-reclock-recursive.c
gcc mutex-reclock-recursive.o -lpthread -o mutex-reclock-recursive
% ./mutex-reclock-recursive  [←]
thread_A(): deposit( 10000 ) ... 
thread_A(): deposit( 10000 ) done. 
thread_B(): add_interest() ... 
thread_B(): add_interest() done. 
main(): shared_resource == 1060500
% ./mutex-reclock-recursive [←]
thread_A(): deposit( 10000 ) ... 
thread_A(): deposit( 10000 ) done. 
thread_B(): add_interest() ... 
thread_B(): add_interest() done. 
main(): shared_resource == 1060500
% []

注意:Linux で、PTHREAD_MUTEX_RECURSIVE が未定義でコンパイルできない場 合、Makefile の次の行をコメントアウトしなさい。

#CFLAGS=	-DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP
この結果、次のように -D が付いた形でコンパイルされる。
% gcc -c -DPTHREAD_MUTEX_RECURSIVE=PTHREAD_MUTEX_RECURSIVE_NP file.c [←]

◆stdio専用再帰的lock/ recursive locking for stdio

printf(), fputs() などの標準入出力(stdio)ライブラリ自体は、スレッド・ セーフなので、単体で使う分にはロックは不要である。 一連の入出力をまとめるために、ロックしなければならないことがある。
	printf("hello,world\n");
上と下は、結果が違う。
	printf("hello,");
	/* ここに他のスレッドの出力が混じることがある。
	   Other threads can print something here.
	*/
	printf("world\n");
これを避けるには、 flockfile(),funlockfile(),ftrylockfile()を使う。
	flockfile(stdout);
	printf("hello,");
	/* ここに他のスレッドの出力が混じることはない
	   Other threads cannot print something here.
	 */
	printf("world\n");
	funlockfile(stdout);
putchar()getchar()は、遅すぎる。 flockfile()/funlockfile()の中で使うための putchar_unlocked(),getchar_unlocked(),putc_unlocked(),getc_unlocked() が用意されている。printf_unsafe() はない。

■Pthreadとメモリ/Pthread and memory

◆auto変数/auto variables

各スレッドには、独立したスタックが割り当てられる。C言語の auto 変数 (auto variables, local variables)は、スレッドごとにコピーが作られる。

<−>再帰呼出し

スレッド間でポインタを渡す時には、スレッドの寿命にも注意。

◆static変数(static variables)

シングルスレッドのプログラムでは、static変数は、プログラムのモジュール 性(modularity) を高めるために有効に使われてきた。

マルチスレッドと相性が非常に悪い。static変数もextern変数と同様に複数の スレッドで共有される。変更する場合には、mutex でロックが必要になる。

◆static変数を使ったライブラリ関数/library functions that use static variables

TCP/IP でプログラムを書く時に使う gethostbyname() は、 static変数に値をセットして、その番地を返す。

struct hostent *gethostbyname( char *name ){
    static struct hostent ret ;
	.....
	return( &ret );
}

複数のスレッドが同時にこの関数を呼び出した場合、同じstatic変数が使われ る。

◆スレッド・セーフ(thread-safe)

複数のスレッドで呼び出してもきちんと動作することを、 スレッド・セーフ(thread-safe) という。 MT-Safe(multi-thread-safe)再入可能(reentrant) ということもある。

externやstaticを使わず、auto変数やmalloc()だけを使っているような手続き は、スレッド・セーフ。

別のスレッド・セーフでない手続きを呼んでいれば、それはスレッド・セーフ ではない。

◆スレッド・セーフなインタフェース(thread-safe interface)

スレッド・セーフになるようにするには、インタフェースを変更する必要があ る。
Sun のマニュアルより:
struct hostent *gethostbyname(const char *name);

struct hostent *gethostbyname_r(const char *name,
     struct hostent *result, char *buffer, int buflen,
     int *h_errnop);

◆スレッド・セーフではない手続きを使う/Using thread-unsafe functions in multithread programs

一見無関係の手続きが内部で変数を共有している場合がある。

■Pthreadでのセマフォの利用/Pthread and semaphore

Pthread には、実時間機能を実現することを目的として、セマフォが使えるよ うになっている。実時間以外の目的でも、セマフォを使ってもよい。

次のような関数が利用可能である。

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value)
初期化。psharedが0だとプロセス内で有効。valueは初期値。
int sem_wait(sem_t * sem)
P命令。値を減らす。0の場合は止まる。
int sem_trywait(sem_t * sem)
非ブロックのsem_wait()。0でも止まらずエラーを返す。
int sem_post(sem_t * sem)
V命令。値を増やす。
int sem_getvalue(sem_t * sem, int * sval)
現在の値を返す。普通は役には立たない。次の瞬間には他のスレッドがP/Vしているかもしれないので。
int sem_destroy(sem_t * sem)
セマフォを破棄する。
注意: SystemV 由来のセマフォ(semget(),semop(),semctl())とは違う。

注意:名前付きのセマフォもある。sem_open() で作成/初期化し、 sem_unlink() で削除する。

注意:Solaris には、POSIX のセマフォとは別に、カーネル内でのデバイス・ ドライバ作成のためのセマフォが用意されている。

■Javaのスレッド/Threads in Java

Java には、最初から言語のレベルでスレッドの機能が入っている。
Java Pthread
new Thread(); start(); pthread_create()
join() pthread_join()
synchronized pthread_mutex_lock()とpthread_mutex_unlock()の組
wait() pthread_cond_wait()
wait(long timeout) pthread_cond_timedwait()
notify() pthread_cond_signal()
notifyAll() pthread_cond_broadcast()

Java の synchronized は、再帰可能(recursive)。PTHREAD_MUTEX_RECURSIVE 相当。 1つのスレッドが2度同じオブジェクトをロックしてもよい。

Pthreads のプログラムで、1つの mutex と1つの条件変数を使ったものなら、 Java で簡単に書き直せる。

Pthreadでの有限バッファのプログラムは、1つの mutex で2つの条件変数を使っているので、単純には Java で書き直せない。 生産者側と消費者側が同時に待つことはないという性質を利用する。

Java で書かれたスレッドのプログラムは、汚いものがけっこうある。スレッ ド「間」の同期で、対称系になるべき所を、片方のスレッドのメソッドにして、 非対称になっていることがある。Java でプログラムを書く時にも、active object (thread) と passive object (threadなし)をきちんと分けた方がよい。

◆Concurrent Pascal と Java の比較(comparisons)

共通点 common ideas 相違点 differences

◆条件変数を使った有限バッファ(Java)

Concurrent Pascalのプログラムと同じ 手続き名と変数名(same procedure and variable names)
   1: 
   2: /*
   3:  * CircularBuffer.java -- Java による環状バッファ
   4:  */
   5: 
   6: class CircularBuffer
   7: {
   8:     static final int BUFFER_SIZE = 4 ;
   9:     int rp ;            // 読み出す位置
  10:     int wp ;            // 書き込む位置
  11:     int data[];         // データを保存する場所
  12:     int used ;          // バッファ内の要素数
  13:     CircularBuffer()
  14:     {
  15:         data = new int[BUFFER_SIZE];
  16:         rp = 0 ;
  17:         wp = 0 ;
  18:         used = 0;
  19:     }
  20: 
  21:     public synchronized void put( int x ) throws InterruptedException
  22:     {
  23:         while( used == data.length )
  24:             wait();
  25:         data[ wp++ ] = x ;
  26:         if( wp == data.length )
  27:             wp = 0 ;
  28:         used++ ;
  29:         notifyAll();
  30:     }
  31:     public synchronized int get() throws InterruptedException
  32:     {
  33:         int x ;
  34:         while( used == 0 )
  35:             wait();         
  36:         x = data[ rp++ ] ;
  37:         if( rp >= data.length )
  38:             rp = 0 ;
  39:         used--;
  40:         notifyAll();
  41:         return( x );
  42:     }
  43: }
   1: 
   2: /*
   3:  * CircularBufferDemo.java -- Java による有限バッファのデモ
   4:  */
   5: 
   6: class Thread_A implements Runnable // Producer
   7: {
   8:     CircularBuffer b;
   9:     Thread_A( CircularBuffer b )
  10:     {
  11:         this.b = b;
  12:     }
  13:     public void run()
  14:     {                           
  15:         int i,x ;
  16:         for( i = 0 ; i<10 ; i++ )
  17:         {
  18:             try
  19:             {
  20:                 x = i ;
  21:                 System.out.println("Thread_A(): put( "+x+" )");
  22:                 b.put( x );
  23:             }
  24:             catch( InterruptedException e )
  25:             {
  26:                 System.err.println("Thread_A(): Interrupted");
  27:                 break;
  28:             }
  29:         }
  30:     }
  31: }
  32: 
  33: class Thread_B implements Runnable // Producer
  34: {
  35:     CircularBuffer b;
  36:     Thread_B( CircularBuffer b )
  37:     {
  38:         this.b = b;
  39:     }
  40:     public void run()
  41:     {                           
  42:         int i,x ;
  43:         for( i = 0 ; i<10 ; i++ )
  44:         {
  45:             try
  46:             {
  47:                 x = b.get();
  48:                 System.out.println("Thread_B(): got() "+x+".");
  49:             }
  50:             catch( InterruptedException e )
  51:             {
  52:                 System.err.println("Thread_B(): Interrupted");
  53:                 break;
  54:             }
  55:         }
  56:     }
  57: }
  58: 
  59: class CircularBufferDemo
  60: {
  61:     public static void main(String argv[])
  62:     {
  63:         final CircularBuffer b = new CircularBuffer();
  64:         Thread t1 = new Thread( new Thread_A(b) );
  65:         t1.start();
  66:         Thread t2 = new Thread( new Thread_B(b) );
  67:         t2.start();
  68:         try
  69:         {
  70:             t1.join();
  71:             t2.join();
  72:         }
  73:         catch( InterruptedException e )
  74:         {
  75:             System.err.println("main(): Interrupted");
  76:         }
  77:     }
  78: }
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/CircularBuffer.java [←]
% wget http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2015/2015-10-06/ex/CircularBufferDemo.java [←]
% javac CircularBuffer.java CircularBufferDemo.java  [←]
% java CircularBufferDemo  [←]
Thread_A(): put( 0 )
Thread_A(): put( 1 )
Thread_A(): put( 2 )
Thread_A(): put( 3 )
Thread_A(): put( 4 )
Thread_B(): got() 0.
Thread_B(): got() 1.
Thread_B(): got() 2.
Thread_B(): got() 3.
Thread_B(): got() 4.
Thread_A(): put( 5 )
Thread_A(): put( 6 )
Thread_B(): got() 5.
Thread_A(): put( 7 )
Thread_A(): put( 8 )
Thread_A(): put( 9 )
Thread_B(): got() 6.
Thread_B(): got() 7.
Thread_B(): got() 8.
Thread_B(): got() 9.
% []

◆BlockingQueue

Java 2 Standard Edition (J2SE) 5.0 で、 java.util.concurrent パッケージに interface BlockingQueue が追加された。 これを使えば、自分で有限バッファを記述する必要はない。 具体的には、ArrayBlockingQueue を使う。 上限を指定しない BlockingQueue もあるが、利用しないことを奨める。

◆java.util.concurrent.locks package

Java 2 Standard Edition (J2SE) 5.0 で、次のようなクラスが追加された。
Lock
Pthread の mutex 相当。 ブロックを越えてロックできる。
Condition
Pthread の条件変数(condition variable)相当。 Lock と合わせて使う。
ReadWriteLock
読書きロック。 Pthread pthread_rwlock_t 相当。

◆Java のセマフォ/Semaphore in Java

Java 2 Standard Edition (J2SE) 5.0 には、 java.util.concurrent.Semaphore クラスがある。

■課題(assignment)1 並行・並列・分散プログラミング、マルチスレッド・プログラミング/concurrent,parallel,and distributed programming,and Multithread programming

以下の問題の中で、次のものを選び、回答しなさい。 Choose one question and answer it from the following list: レポートには、次のものを含めなさい。 A report must include following. この内容を含む「テキスト」ファイルを作成し、 レポート提出ページ から提出しなさい。

締切りは、2015年10月11日、 23:59:59 とする。

★問題(101) スレッドの数/ number of threads

相互排除のプログラムや条件変数プログラムでスレッドの数を増やして実行し てみなさい。 The multithread programs using mutex and condition variables creates one or two new threads. Rewrite these programs that creates three or more threads and execute them.

★問題(102) 手続きとスレッド/ procedure and threads

1つの手続き(C言語の関数)から複数のスレッドを生成してみなさい。 Create multiple threads from a single procedure (function in C).

★問題(103) ダブルバッファリング/ double buffering

循環バッファのプログラムダブルバッファリングを行なうプログラムに 変更しなさい。 Rewrite the program of a bounded buffer to perform double buffering.

★問題(104) 条件変数の削減/ removing condition variables

循環バッファのプログラムで、条件変数を1つになるように変更しなさい。 一度に wait する必要があるのは、put() 側か get() のどちらか一方だけである。 よって、両方とも同じキューにつないでも、動作する。

The program of bounded buffer uses two condition variables. Rewrite this program to use a single condition variables. Since a thread can block either on put() or get(), we can share a single queue of a condition variable.

★問題(105) Javaにおける再帰的モニタの確認/ Recursive mutex in Java

再帰的mutexの例題で使った Pthread のプログラムを Java で書き直しなさい。 Rewrite the program of recursive mutex in C into the program in Java.

★問題(106) 残高照会と引き出し機能の追加/Adding balance() and withdraw()

再帰的mutexの例題で使ったプ ログラムに、口座の残高を返す手続きと口座から現金を引き出す手続きを付け なさい。ただし、現金の引出しでは、残高が負にならないようにしなさい。 Add following two functions to the program of recursive mutex . The first function returns the balance of the bank account. The second function withdraws the given amount of money from the bank account. If the bank account does not have sufficient money, this function should nothing.

int balance()
{
...
// return the balance 残高を返す
}
int withdraw( int n )
{
...
// return 1 if succeeded. 成功の時は 1 を返す
// return 0 if failed due to insufficient money. 残高不足の時は 0
}

プログラミング言語は、C言語(Pthread)または Java を用いなさい。 You should use the programming language C with Thread or Java.

★問題(107) 複数要素を受付ける有限バッファ/ multi-item bounded buffer

条件変数を使った有限バッファのプログラムのうち、 (C言語Pthread版) 、または、 (Java版) を書き換えて、 一度に n 個(n は可変)の要素を受付けるようにしなさい。 Rewrite the bounded buffer programs in (C with Pthread) or (Java) to take n items at one. すなわち、

作成した有限バッファに対して複数のスレッドがput(n,x[]) した時、x[] の要 素がインターリーブされることがあるか? たとえば、2つのスレッドが次のよう な操作をしたとする。 When multiple threads calls the functin put(n,x[]), can the items in each x[] be interleved or not? For example, consider the following actions. このデータをスレッド3で1個ずつ get() した時に、インターリーブされないな らば、必ず次のいずれかの結果になる。 Thread 3 takes items with get(). If the items are not interleaved, the results will be either of the following. インターリーブされたとすると、次のような結果が出てくることがある。 If the items are interleaved, the output can be as follows.

★問題(108) 循環バッファをセマフォで実現する/ bounded buffer with semaphores

循環バッファのプログラムを、セマフォを使って書き直しなさい。 Rewrite the bounded buffer programs in (C with Pthread) or (Java) to use semaphores.

プログラミング言語は、C言語(Pthread)または Java を用いなさい。 You should use the programming language C with Thread or Java.

セマフォを使った循環バッファのプログラムの作成は、次の教科書の練習問 題にもなっている(巻末に回答もある)。 The following text book of operating systems contains exercise of a bounded buffer. The answer is also included in the end of the book.

清水謙多郎: "オペレーティングシステム",岩波書店 (1992). ISBN: 4000078526.

★問題(109) Javaのjava.util.concurrent.*の利用/ Using java.util.concurrent.* in Java

このページで示した例題、または、練習問題を、Java 言語で次のクラス、また はインタフェースを1つ以上用いて実装しなさい。 Use one or more following classes or interfaces and write a program in these exercises or examples in this page.
Last updated: 2015/10/06 08:25:50
Yasushi Shinjo / <yas@cs.tsukuba.ac.jp>