並行システム
                               システム情報工学研究科コンピュータサイエンス専攻、電子・情報工学系
                               新城 靖
                               <yas@is.tsukuba.ac.jp>
このページは、次の URL にあります。
	http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-08
あるいは、次のページから手繰っていくこともできます。
	http://www.cs.tsukuba.ac.jp/~yas/sie/
	http://www.cs.tsukuba.ac.jp/~yas/
http://www2a.biglobe.ne.jp/~seki/ruby/d208.html,
関 将俊氏による Rinda の解説。本のサンプルコードと説明含む。
並列処理の最終結果に焦点を当る。
例: 家の建築:
例: 家の建築:
問題にあったものを使う。
実際の家の建築では、全部の方法が使われている。
うまく行く例:ベクトルの足し算: S[i] = A[i] + B[i]
M[][0] に最初の位置。
position(i,j), 繰り返し j での 物体 i の位置を計算する。
position(X,j-1) の終了を待つ。
プロセスがデータに化けて行く。
物体に対応したプロセスを作る。手順(ワーカの仕事):集合に含まれている全ての物体について、次の位置を 計算する。
マスタで、N 個の物体を作る。
ワーカを作る。ワーカの数は、N 個ではなくて、もっと少ない(CPU数と同じに する)。
物体の位置を分散データ構造体(共有メモリ)に置く。

図? 手法の変換
解決1:ライブデータ構造体を、受動的な構造体に書き換える。プロセスを複数 の構造体に対応させる。
問題2:分散データ構造体で書いたプログラム(共有空間が必要)が、NORMA で うまく動かない。
解決:メッセージ・パッシングに変換する。
Carriero と Gelernter の主張:分散データ構造体がいい。
Java では、スレッド、RMI、Javaspaces の順に導入された。
モニタは、メッセージパッシングの仲間か分散データ構造体か。
タプルペースモデル(tuple space model)で分散データ構造体を支援。(メッ セージ・パッシングやライブデータ構造体的なプログラムも書ける。)
タプルは、型付きの値の並び。
タプルの例:
("a string", 10.01, 17, 10)
(0,1)
2種類のタプル
out("a string", 10.01, 17, x)
in("a string", ?f, ?i, y)
「?」付のものは、formal。型が同じものとマッチする。 ついていないものは、actual。型と値が同じものとマッチする。
eval("e", 7, exp(7))
新しくプロセス(スレッド)が作られ、exp(7) を計算しはじめる。プロセスが
終了すると、最終結果は、値に変わり、out() されたのと同じになる。
タプルの形式: (name,val)
読込み:
    rd(name,?val)
変更:
    in(name,?val)
    val = val + 1 ;
    out(name,val)
P命令: 
    in("sem-1")
V命令:
    out("sem-1");
同じタプルを out したら溜る。
注意:in() したデータは、1プロセスでしかアクセスされないので、セマフォ などによるロックは不要。
仕事を入れる:
    out("task",TaskDescription)
仕事を取り出す:
    in("task",?NewTask)
for( i=0 ; i<N; i++ )
{
    func(i,args);
}
並列:
for( i=0 ; i<N; i++ )
{
    eval ("loop-33", func(i,args) ); // プロセス生成
}
for( i=0 ; i<N; i++ )
{
    in("loop-33", 1 ); // 待ち
}
func(i,args)
{
   ...
   return( 1 );
}
マルチスレッドの マスタ・スレーブ(バリア付き) 参照。
n プロセスのバリア
初期化:
    out("barrier-37",n)
各プロセス: 1減らして、0になるのを待つ。
    in("barrier-37",?val)
    out("barrier-37",val-1)
    rd("barrier-37",0)
A[10];
("A",0,val00)
("A",1,val01)
("A",2,val02)
...
("A",9,val99)
A[10][10];
("A",0,0,val00)
("A",0,1,val01)
("A",0,2,val02)
...
("A",9,9,val99)
ストリームデータ
    ("stream",0,val0)
    ("stream",1,val1)
    ("stream",2,val2)
    ...
ポインタ
    ("stream","head",0)
    ("stream","tail",0)
ストリームに要素を追加:
    int index;
    in("stream","tail",?index);
    out("stream","tail",index+1);
    out("stream",index,new_element);
ストリームから要素を取り出す:
    int index;
    in("stream","head",?index);
    out("stream","head",index+1);
    in("stream",index,?element);
これは、複数source、複数sink。
source、sinkが1つなら、head, tail をタプル空間に置かなくてもよい。
head の代わりに、局所変数でアクセスする。
ストリームから要素を取り出す:
    int index=0 ;
    while( ... )
    {
       rd("stream",index++,?element);
    }
("primes", 2, 1 ) // primes[2] = 1 ;
("primes", 3, 1 ) // primes[3] = 1 ;
("primes", 4, 0 ) // primes[4] = 0 ;
("primes", 5, 1 ) // primes[5] = 1 ;
("primes", 6, 0 ) // primes[6] = 0 ;
("primes", 7, 1 ) // primes[7] = 1 ;
..
プログラム
   1: /*
   2:  *      prime-results.c
   3:  */
   4: 
   5: #define LIMIT 1000
   6: 
   7: main()
   8: {
   9:     int count, i, ok;
  10:         for( i=2; i<=LIMIT; i++ )
  11:             eval ("primes", i, is_prime(i) );
  12:         count = 0 ;
  13:         for( i=2; i<=LIMIT; i++ )
  14:         {
  15:             rd("primes", i, ?ok);
  16:             if( ok )
  17:                 count ++;
  18:         }
  19:         printf("%d.\n", count );
  20: }
  21: 
  22: is_prime( int me )
  23: {
  24:     int i, limit, ok;
  25:     double sqrt();
  26:         limit = (int) sqrt( (double)me ) + 1;
  27:         for( i=2; i<limit; i++ )
  28:         {
  29:             rd("primes", i, ?ok);
  30:             if( ok && (me%i == 0) )
  31:                 return( 0 );
  32:         }
  33:         return( 1 );
  34: }
2 から sqrt(n) 以下の素数を rd() して得、割ってみて剰りを調べる。
利点
問題点
while( 1 )
{
    in("next task", start );
    out("next task", start + GRAIN);
    start から start + GRAIN の間の素数を探す;
}
("primes", 0, 2, 4  ) // primes[0] = 2 ; p2[0] = 4 ;
("primes", 1, 3, 9  ) // primes[1] = 3 ; p2[1] = 9 ;
("primes", 2, 5, 25 ) // primes[2] = 5 ; p2[2] = 25 ;
("primes", 3, 7, 49 ) // primes[3] = 7 ; p2[3] = 49 ;
("result", start,	  1番目の固まり)
("result", start+GRAIN,   2番目の固まり)
("result", start+GRAIN*3, 3番目の固まり)
("result", start+GRAIN*4, 4番目の固まり)
各「固まり」は、配列で、最後に終り -1 が入っている。
マスタは、この ("result",,) のストリームを読込み、("primes",,,) のスト
リームを作る。
   1: /*
   2:  *      prime-agenda.c
   3:  */
   4: 
   5: #define LIMIT 1000
   6: #define GRAIN 100
   7: #define MAX   1000
   8: 
   9: main( int argc, char *argv[], char *envp[] )
  10: {
  11:     int eot, first_num, i, length, new_primes[GRAIN], np2;
  12:     int num, num_primes, num_workers, primes[MAX], p2[MAX];
  13:         num_workers = atoi( argv[1] );
  14:         for( i=0; i< num_workers; i++ )
  15:             eval ("worker", worker() );
  16: 
  17:         num_primes = init_primes( primes, p2 );
  18:         first_num = primes[num_primes-1] + 2;
  19:         out("next task", first_num);
  20: 
  21:         eot = 0 ; /* end of table */
  22:         for( num=first_num; num<LIMIT; num+=GRAIN)
  23:         {
  24:             in("result", num, ?new_primes:length );
  25:             for( i=0; i<length; i++, num_primes++)
  26:             {
  27:                 primes[num_primes] = new_primes[i];
  28:                 if( !eot )
  29:                 {
  30:                     np2 = new_primes[i] * new_primes[i] ;
  31:                     if( np2 > LIMIT )
  32:                     {
  33:                         eot = 1 ;
  34:                         np2 = -1 ;
  35:                     }
  36:                     out("primes", num_primes, new_primes[i], np2 );
  37:                 }
  38:             }
  39:         }       
  40:         for( i=0; i<num_workers; i++)
  41:             in("worker", ?int);
  42:         printf("%d.\n", num_primes );
  43: }
  44: 
  45: worker()
  46: {
  47:     int count, eot, i, limit, num, num_primes, ok, start;
  48:     int my_primes[GRAIN], primes[MAX], p2[MAX];
  49:         num_primes = init_primes( primes, p2 );
  50:         
  51:         eot = 0;
  52:         while( 1 )
  53:         {
  54:             in("next task", ?num );
  55:             if( num == -1 )
  56:             {
  57:                 out("next task", -1 );
  58:                 return;
  59:             }
  60:             limit = num + GRAIN;
  61:             out("next task", (limit>LIMIT) ? -1 : limit );
  62:             if( limit > LIMIT )
  63:                 limit = LIMIT ;
  64: 
  65:             start = num;
  66:             for( count=0; num<limit; num+=2 )
  67:             {
  68:                 while( !eot && num > p2[num_primes-1] )
  69:                 {
  70:                     rd("primes", num_primes,
  71:                        ?primes[num_primes], ?p2[num_primes] );
  72:                     if( p2[num_primes] < 0 )
  73:                         eot = 1 ;
  74:                     else
  75:                         num_primes++;
  76:                 }
  77:                 for( i=0, ok=1; i<num_primes; i++ )
  78:                 {
  79:                     if( num%primes[i] == 0 )
  80:                     {
  81:                         ok = 0;
  82:                         break;
  83:                     }
  84:                     if( num < p2[i] )
  85:                         break;
  86:                 }
  87:                 if( ok )
  88:                     my_primes[count++] = num ;
  89:             }
  90:             out("result", start, my_primes:count );
  91:         }
  92: }
  93: 
  94: int
  95: init_prime( int primes[MAX], int p2[MAX] )
  96: {
  97:         primes[0] = 2;  p2[0] = 2 * 2;
  98:         primes[1] = 3;  p2[1] = 3 * 3;
  99:         return( 2 );
 100: }
source | pipe_seg-3 | pipe_seg-5 | pipe_seg-7 | .... | sink
("seg", 宛先, ストリームのインデックス, 整数)
たとえば、source の出力(主に pipe_seg-3の入力)は、次のようになる。
("seg", 3, 0, 5 ) // sink が in
("seg", 3, 1, 7 ) // 以下 pipe_seg-3 が in する
("seg", 3, 2, 9 )
("seg", 3, 3, 11 )
("seg", 3, 3, 13 )
...
pipe_seg-3 の出力は、次のようになる。
("seg", 5, 0, 7 )  // sink が in する
("seg", 5, 1, 11 ) // 以下  pipe_seg-5 が in する
("seg", 5, 2, 13 ) 
...
最終的には、次のようなデータがタプル空間に残される。
("source", 1, 2)
("pipe seg", 2, 3)
("pipe seg", 3, 5)
("pipe seg", 4, 7)
...
("sink", MaxIndex, MaxPrime)
この方法は、結果並列や手順並列法よりも、並列性が低い。
   1: /*
   2:  *      prime-specialists.c
   3:  */
   4: 
   5: #define LIMIT 1000
   6: 
   7: main()
   8: {
   9:         eval ("soruce", source());
  10:         eval ("sink", sink());
  11: }
  12: 
  13: source()
  14: {
  15:     int i, out_index=0;
  16:         for( i=0; i<LIMIT; i+= 2)
  17:             out("seg", 3, out_index++, i );
  18:         out("seg", 3, out_index, 0 );
  19: }
  20: 
  21: sink()
  22: {
  23:     int in_index=0, num, prime=3, prime_count=2;
  24:         while( 1 )
  25:         {
  26:             in("seg", prime, in_index++, ?num);
  27:             if( num == 0 )
  28:                 break;
  29:             if( num % prime != 0 )
  30:             {
  31:                 prime_count ++ ;
  32:                 if( num * num < LIMIT )
  33:                 {
  34:                     eval ("pipe seg", pipe_seg(prime, num, in_index));
  35:                     prime = num;
  36:                     in_index = 0;
  37:                 }
  38:             }
  39:         }
  40:         printf("count: %d\n", prime_count );
  41: }
  42: 
  43: pipe_seg( int prime, int next, int in_index )
  44: {
  45:     int num, out_index=0;
  46:         while( 1 )
  47:         {
  48:             in("seg", prime, in_index++, ?num );
  49:             if( num == 0 )
  50:                 break;
  51:             if( num % prime != 0 )
  52:                 out("seg", next, out_index++, num);
  53:         }
  54:         out("seg", next, out_index, num );
  55: }
| Linda | Rinda | 説明 | 
| タプル() | 配列[] | タプル空間に置くことができるデータ構造 | 
| out | write | タプルをタプル空間内に生成する。 | 
| in | take | タプルを取り去る | 
| rd | read | in/takeと似ているが、タプルがタプル空間に残る。 | 
Rinda が提供する空間の特徴
=== で等しい。
% ruby -e 'p 1 === 1'true % ruby -e 'p Integer === 1'
true % ruby -e 'p String === 1'
false % ruby -e 'p /[abc]xy/ === "axy"'
true % ruby -e 'p /[abc]xy/ === "Axy"'
false %
["Message Box", "Hello" ]
   1: #!/usr/bin/env ruby
   2: # make-space.rb -- Make a tuple space and print its URI
   3: 
   4: require 'rinda/tuplespace'
   5: 
   6: def usage()
   7:         $stderr.printf("Usage: %% %s portno\n", $0)
   8:         exit( 1 )
   9: end
  10: 
  11: def main(argv)
  12:         if( argv.length != 1 )
  13:             usage()
  14:         end
  15:         portno = argv[0]
  16:         space = Rinda::TupleSpace.new()
  17:         DRb.start_service("druby://:"+portno, space)
  18:         uri = DRb.uri()
  19:         $stdout.printf("%s\n",uri)
  20:         $stdout.printf("Type ^C to stop this program.\n")
  21:         DRb.thread.join()
  22: end
  23: 
  24: main(ARGV)
   1: #!/usr/bin/env ruby
   2: # mbox-writer.rb -- Write a message to the message box in a tupple space.
   3: 
   4: require 'rinda/tuplespace'
   5: 
   6: def usage()
   7:         $stderr.printf("Usage: %% %s uri message\n", $0)
   8:         exit( 1 )
   9: end
  10: 
  11: def main(argv)
  12:         if( argv.length != 2 )
  13:             usage()
  14:         end
  15:         uri = argv[0]
  16:         message = argv[1]
  17: 
  18:         space = DRbObject.new_with_uri( uri )
  19: 
  20:         tuple = ["Message Box", message ]
  21:         space.write( tuple )
  22:         printf("mbox-writer: wrote[%s]\n",message)
  23: end
  24: 
  25: main(ARGV)
   1: #!/usr/bin/env ruby
   2: # mbox-reader.rb -- Read a message from a message box in a tuple space.
   3: 
   4: require 'rinda/tuplespace'
   5: 
   6: def usage()
   7:         $stderr.printf("Usage: %% %s uri\n", $0)
   8:         exit( 1 )
   9: end
  10: 
  11: def main(argv)
  12:         if( argv.length != 1 )
  13:             usage()
  14:         end
  15:         uri = argv[0]
  16:         DRb.start_service()
  17:         space = DRbObject.new_with_uri( uri )
  18: 
  19:         template = ["Message Box",nil]
  20:         tuple = space.read( template )
  21:         message = tuple[1]
  22:   p tuple # for debug
  23:         printf("mbox-reader: read [%s]\n", message )
  24: end
  25: 
  26: main(ARGV)
space.read()で、タプル空間からタプルを取り出す。
最初の引数は、テンプレートである。
read() は、空間からテンプレートとマッチするエントリを読み出す。
read() は、マッチするエントリがなければ、タイムアウトするまで待つ。 第2引数に秒単位で待ち時間を指定できる。
注意:連続する read() が同じオブジェクトを返す保証はない。
   1: #!/usr/bin/env ruby
   2: # mbox-taker.rb -- Take a message from a message box in a tuple space.
   3: 
   4: require 'rinda/tuplespace'
   5: 
   6: def usage()
   7:         $stderr.printf("Usage: %% %s uri\n", $0)
   8:         exit( 1 )
   9: end
  10: 
  11: def main(argv)
  12:         if( argv.length != 1 )
  13:             usage()
  14:         end
  15:         uri = argv[0]
  16:         DRb.start_service()
  17:         space = DRbObject.new_with_uri( uri )
  18: 
  19:         template = ["Message Box",nil]
  20:         tuple = space.take( template )
  21:         message = tuple[1]
  22:   p tuple # for debug
  23:         printf("mbox-taker: took [%s]\n", message )
  24: end
  25: 
  26: main(ARGV)
take() は、read() と同じだが、エントリを空間から取り去る所が異なる。複
数の take() が重なったとしても、エントリは1つにしか取られない。
% diff mbox-reader.rb mbox-taker.rb2c2 < # mbox-reader.rb -- Read a message from a message box in a tuple space. --- > # mbox-taker.rb -- Take a message from a message box in a tuple space. 20c20 < tuple = space.read( template ) --- > tuple = space.take( template ) 23c23 < printf("mbox-reader: read [%s]\n", message ) --- > printf("mbox-taker: took [%s]\n", message ) %
![[]](../icons/screen-cursor.gif)
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-08/ex/make-space.rb実行には、Ruby の 1.8 以降が必要。drb.rb を含んだもの。% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-08/ex/mbox-writer.rb
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-08/ex/mbox-reader.rb
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-08/ex/mbox-taker.rb
% chmod +x *.rb
% ls -l
total 32 -rwxr-xr-x 1 yas yas 463 2 7 23:42 make-space.rb -rwxr-xr-x 1 yas yas 485 2 7 23:48 mbox-reader.rb -rwxr-xr-x 1 yas yas 484 2 8 00:06 mbox-taker.rb -rwxr-xr-x 1 yas yas 406 2 7 23:40 mbox-writer.rb % which ruby
/usr/local/bin/ruby % ruby -v
ruby 1.8.6 (2007-03-13 patchlevel 0) [universal-darwin8.0] %
![[]](../icons/screen-cursor.gif)
% ./make-space.rb 1231引数のポート番号は、ぶつからないような番号にする。 自動的に終了しないので、実験が終わったら ^C (Control-C) で殺す。druby://iris:1231 Type ^C to stop this program. (最後に ^C で止める)
Writer を動作させる。
% ./mbox-writer.rb druby://iris:1231 hello最後のウインドウで Reader や Taker を動作させる。mbox-writer: wrote[hello] % ./mbox-writer.rb druby://iris:1231 hi
mbox-writer: wrote[hi] %
![[]](../icons/screen-cursor.gif)
% ./mbox-reader.rb druby://iris:12313回目の take は止まる。別ウインドウで write すれば、先に進む。["Message Box", "hello"] mbox-reader: read [hello] % ./mbox-reader.rb druby://iris:1231
["Message Box", "hello"] mbox-reader: read [hello] % ./mbox-reader.rb druby://iris:1231
["Message Box", "hello"] mbox-reader: read [hello] % ./mbox-taker.rb druby://iris:1231
["Message Box", "hello"] mbox-taker: took [hello] % ./mbox-taker.rb druby://iris:1231
["Message Box", "hi"] mbox-taker: took [hi] % ./mbox-taker.rb druby://iris:1231
...
タプルがない状態先に take/read すると、止まる。この状態で、write すれ ば、read/take が終了する。
interface JavaSpace を実現したオブジェクト
| Linda | JavaSpace | 説明 | 
| out | write | タプルをタプル空間内に生成する。 | 
| in | take | タプルを取り去る | 
| rd | read | in/takeと似ているが、タプルがタプル空間に残る。 | 
write, read, take を使う部分のプログラムは簡単だが、space を利用可能に するのには苦労する。
次の2つのプログラムを作成する。
(1) 初期化プログラム
    String url = argv[0];
    tuple = [url,0]
    space.write( tuple );
(2) アクセスされた時に動作するプログラム
    値を増やす。現在の値を画面に表示する。