並行システム システム情報工学研究科コンピュータサイエンス専攻、電子・情報工学系 新城 靖 <yas@is.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-08
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/sie/
http://www.cs.tsukuba.ac.jp/~yas/
http://www2a.biglobe.ne.jp/~seki/ruby/d208.html
,
関 将俊氏による Rinda の解説。本のサンプルコードと説明含む。
並列処理の最終結果に焦点を当る。
例: 家の建築:
例: 家の建築:
問題にあったものを使う。
実際の家の建築では、全部の方法が使われている。
うまく行く例:ベクトルの足し算: S[i] = A[i] + B[i]
M[][0] に最初の位置。
position(i,j), 繰り返し j での 物体 i の位置を計算する。
position(X,j-1) の終了を待つ。
プロセスがデータに化けて行く。
物体に対応したプロセスを作る。手順(ワーカの仕事):集合に含まれている全ての物体について、次の位置を 計算する。
マスタで、N 個の物体を作る。
ワーカを作る。ワーカの数は、N 個ではなくて、もっと少ない(CPU数と同じに する)。
物体の位置を分散データ構造体(共有メモリ)に置く。
図? 手法の変換
解決1:ライブデータ構造体を、受動的な構造体に書き換える。プロセスを複数 の構造体に対応させる。
問題2:分散データ構造体で書いたプログラム(共有空間が必要)が、NORMA で うまく動かない。
解決:メッセージ・パッシングに変換する。
Carriero と Gelernter の主張:分散データ構造体がいい。
Java では、スレッド、RMI、Javaspaces の順に導入された。
モニタは、メッセージパッシングの仲間か分散データ構造体か。
タプルペースモデル(tuple space model)で分散データ構造体を支援。(メッ セージ・パッシングやライブデータ構造体的なプログラムも書ける。)
タプルは、型付きの値の並び。
タプルの例:
("a string", 10.01, 17, 10) (0,1)
2種類のタプル
out("a string", 10.01, 17, x) in("a string", ?f, ?i, y)
「?」付のものは、formal。型が同じものとマッチする。 ついていないものは、actual。型と値が同じものとマッチする。
eval("e", 7, exp(7))新しくプロセス(スレッド)が作られ、exp(7) を計算しはじめる。プロセスが 終了すると、最終結果は、値に変わり、out() されたのと同じになる。
タプルの形式: (name,val)
読込み: rd(name,?val) 変更: in(name,?val) val = val + 1 ; out(name,val)
P命令: in("sem-1") V命令: out("sem-1");同じタプルを out したら溜る。
注意:in() したデータは、1プロセスでしかアクセスされないので、セマフォ などによるロックは不要。
仕事を入れる: out("task",TaskDescription) 仕事を取り出す: in("task",?NewTask)
for( i=0 ; i<N; i++ ) { func(i,args); }並列:
for( i=0 ; i<N; i++ ) { eval ("loop-33", func(i,args) ); // プロセス生成 } for( i=0 ; i<N; i++ ) { in("loop-33", 1 ); // 待ち } func(i,args) { ... return( 1 ); }
マルチスレッドの マスタ・スレーブ(バリア付き) 参照。
n プロセスのバリア
初期化: out("barrier-37",n)
各プロセス: 1減らして、0になるのを待つ。 in("barrier-37",?val) out("barrier-37",val-1) rd("barrier-37",0)
A[10]; ("A",0,val00) ("A",1,val01) ("A",2,val02) ... ("A",9,val99)
A[10][10]; ("A",0,0,val00) ("A",0,1,val01) ("A",0,2,val02) ... ("A",9,9,val99)
ストリームデータ ("stream",0,val0) ("stream",1,val1) ("stream",2,val2) ... ポインタ ("stream","head",0) ("stream","tail",0) ストリームに要素を追加: int index; in("stream","tail",?index); out("stream","tail",index+1); out("stream",index,new_element); ストリームから要素を取り出す: int index; in("stream","head",?index); out("stream","head",index+1); in("stream",index,?element);これは、複数source、複数sink。
source、sinkが1つなら、head, tail をタプル空間に置かなくてもよい。
head の代わりに、局所変数でアクセスする。
ストリームから要素を取り出す: int index=0 ; while( ... ) { rd("stream",index++,?element); }
("primes", 2, 1 ) // primes[2] = 1 ; ("primes", 3, 1 ) // primes[3] = 1 ; ("primes", 4, 0 ) // primes[4] = 0 ; ("primes", 5, 1 ) // primes[5] = 1 ; ("primes", 6, 0 ) // primes[6] = 0 ; ("primes", 7, 1 ) // primes[7] = 1 ; ..
プログラム
1: /* 2: * prime-results.c 3: */ 4: 5: #define LIMIT 1000 6: 7: main() 8: { 9: int count, i, ok; 10: for( i=2; i<=LIMIT; i++ ) 11: eval ("primes", i, is_prime(i) ); 12: count = 0 ; 13: for( i=2; i<=LIMIT; i++ ) 14: { 15: rd("primes", i, ?ok); 16: if( ok ) 17: count ++; 18: } 19: printf("%d.\n", count ); 20: } 21: 22: is_prime( int me ) 23: { 24: int i, limit, ok; 25: double sqrt(); 26: limit = (int) sqrt( (double)me ) + 1; 27: for( i=2; i<limit; i++ ) 28: { 29: rd("primes", i, ?ok); 30: if( ok && (me%i == 0) ) 31: return( 0 ); 32: } 33: return( 1 ); 34: }2 から sqrt(n) 以下の素数を rd() して得、割ってみて剰りを調べる。
利点
問題点
while( 1 ) { in("next task", start ); out("next task", start + GRAIN); start から start + GRAIN の間の素数を探す; }
("primes", 0, 2, 4 ) // primes[0] = 2 ; p2[0] = 4 ; ("primes", 1, 3, 9 ) // primes[1] = 3 ; p2[1] = 9 ; ("primes", 2, 5, 25 ) // primes[2] = 5 ; p2[2] = 25 ; ("primes", 3, 7, 49 ) // primes[3] = 7 ; p2[3] = 49 ;
("result", start, 1番目の固まり) ("result", start+GRAIN, 2番目の固まり) ("result", start+GRAIN*3, 3番目の固まり) ("result", start+GRAIN*4, 4番目の固まり)各「固まり」は、配列で、最後に終り -1 が入っている。 マスタは、この ("result",,) のストリームを読込み、("primes",,,) のスト リームを作る。
1: /* 2: * prime-agenda.c 3: */ 4: 5: #define LIMIT 1000 6: #define GRAIN 100 7: #define MAX 1000 8: 9: main( int argc, char *argv[], char *envp[] ) 10: { 11: int eot, first_num, i, length, new_primes[GRAIN], np2; 12: int num, num_primes, num_workers, primes[MAX], p2[MAX]; 13: num_workers = atoi( argv[1] ); 14: for( i=0; i< num_workers; i++ ) 15: eval ("worker", worker() ); 16: 17: num_primes = init_primes( primes, p2 ); 18: first_num = primes[num_primes-1] + 2; 19: out("next task", first_num); 20: 21: eot = 0 ; /* end of table */ 22: for( num=first_num; num<LIMIT; num+=GRAIN) 23: { 24: in("result", num, ?new_primes:length ); 25: for( i=0; i<length; i++, num_primes++) 26: { 27: primes[num_primes] = new_primes[i]; 28: if( !eot ) 29: { 30: np2 = new_primes[i] * new_primes[i] ; 31: if( np2 > LIMIT ) 32: { 33: eot = 1 ; 34: np2 = -1 ; 35: } 36: out("primes", num_primes, new_primes[i], np2 ); 37: } 38: } 39: } 40: for( i=0; i<num_workers; i++) 41: in("worker", ?int); 42: printf("%d.\n", num_primes ); 43: } 44: 45: worker() 46: { 47: int count, eot, i, limit, num, num_primes, ok, start; 48: int my_primes[GRAIN], primes[MAX], p2[MAX]; 49: num_primes = init_primes( primes, p2 ); 50: 51: eot = 0; 52: while( 1 ) 53: { 54: in("next task", ?num ); 55: if( num == -1 ) 56: { 57: out("next task", -1 ); 58: return; 59: } 60: limit = num + GRAIN; 61: out("next task", (limit>LIMIT) ? -1 : limit ); 62: if( limit > LIMIT ) 63: limit = LIMIT ; 64: 65: start = num; 66: for( count=0; num<limit; num+=2 ) 67: { 68: while( !eot && num > p2[num_primes-1] ) 69: { 70: rd("primes", num_primes, 71: ?primes[num_primes], ?p2[num_primes] ); 72: if( p2[num_primes] < 0 ) 73: eot = 1 ; 74: else 75: num_primes++; 76: } 77: for( i=0, ok=1; i<num_primes; i++ ) 78: { 79: if( num%primes[i] == 0 ) 80: { 81: ok = 0; 82: break; 83: } 84: if( num < p2[i] ) 85: break; 86: } 87: if( ok ) 88: my_primes[count++] = num ; 89: } 90: out("result", start, my_primes:count ); 91: } 92: } 93: 94: int 95: init_prime( int primes[MAX], int p2[MAX] ) 96: { 97: primes[0] = 2; p2[0] = 2 * 2; 98: primes[1] = 3; p2[1] = 3 * 3; 99: return( 2 ); 100: }
source | pipe_seg-3 | pipe_seg-5 | pipe_seg-7 | .... | sink
("seg", 宛先, ストリームのインデックス, 整数)たとえば、source の出力(主に pipe_seg-3の入力)は、次のようになる。
("seg", 3, 0, 5 ) // sink が in ("seg", 3, 1, 7 ) // 以下 pipe_seg-3 が in する ("seg", 3, 2, 9 ) ("seg", 3, 3, 11 ) ("seg", 3, 3, 13 ) ...pipe_seg-3 の出力は、次のようになる。
("seg", 5, 0, 7 ) // sink が in する ("seg", 5, 1, 11 ) // 以下 pipe_seg-5 が in する ("seg", 5, 2, 13 ) ...最終的には、次のようなデータがタプル空間に残される。
("source", 1, 2) ("pipe seg", 2, 3) ("pipe seg", 3, 5) ("pipe seg", 4, 7) ... ("sink", MaxIndex, MaxPrime)
この方法は、結果並列や手順並列法よりも、並列性が低い。
1: /* 2: * prime-specialists.c 3: */ 4: 5: #define LIMIT 1000 6: 7: main() 8: { 9: eval ("soruce", source()); 10: eval ("sink", sink()); 11: } 12: 13: source() 14: { 15: int i, out_index=0; 16: for( i=0; i<LIMIT; i+= 2) 17: out("seg", 3, out_index++, i ); 18: out("seg", 3, out_index, 0 ); 19: } 20: 21: sink() 22: { 23: int in_index=0, num, prime=3, prime_count=2; 24: while( 1 ) 25: { 26: in("seg", prime, in_index++, ?num); 27: if( num == 0 ) 28: break; 29: if( num % prime != 0 ) 30: { 31: prime_count ++ ; 32: if( num * num < LIMIT ) 33: { 34: eval ("pipe seg", pipe_seg(prime, num, in_index)); 35: prime = num; 36: in_index = 0; 37: } 38: } 39: } 40: printf("count: %d\n", prime_count ); 41: } 42: 43: pipe_seg( int prime, int next, int in_index ) 44: { 45: int num, out_index=0; 46: while( 1 ) 47: { 48: in("seg", prime, in_index++, ?num ); 49: if( num == 0 ) 50: break; 51: if( num % prime != 0 ) 52: out("seg", next, out_index++, num); 53: } 54: out("seg", next, out_index, num ); 55: }
Linda | Rinda | 説明 |
タプル() | 配列[] | タプル空間に置くことができるデータ構造 |
out | write | タプルをタプル空間内に生成する。 |
in | take | タプルを取り去る |
rd | read | in/takeと似ているが、タプルがタプル空間に残る。 |
Rinda が提供する空間の特徴
===
で等しい。
% ruby -e 'p 1 === 1'true % ruby -e 'p Integer === 1'
true % ruby -e 'p String === 1'
false % ruby -e 'p /[abc]xy/ === "axy"'
true % ruby -e 'p /[abc]xy/ === "Axy"'
false %
["Message Box", "Hello" ]
1: #!/usr/bin/env ruby 2: # make-space.rb -- Make a tuple space and print its URI 3: 4: require 'rinda/tuplespace' 5: 6: def usage() 7: $stderr.printf("Usage: %% %s portno\n", $0) 8: exit( 1 ) 9: end 10: 11: def main(argv) 12: if( argv.length != 1 ) 13: usage() 14: end 15: portno = argv[0] 16: space = Rinda::TupleSpace.new() 17: DRb.start_service("druby://:"+portno, space) 18: uri = DRb.uri() 19: $stdout.printf("%s\n",uri) 20: $stdout.printf("Type ^C to stop this program.\n") 21: DRb.thread.join() 22: end 23: 24: main(ARGV)
1: #!/usr/bin/env ruby 2: # mbox-writer.rb -- Write a message to the message box in a tupple space. 3: 4: require 'rinda/tuplespace' 5: 6: def usage() 7: $stderr.printf("Usage: %% %s uri message\n", $0) 8: exit( 1 ) 9: end 10: 11: def main(argv) 12: if( argv.length != 2 ) 13: usage() 14: end 15: uri = argv[0] 16: message = argv[1] 17: 18: space = DRbObject.new_with_uri( uri ) 19: 20: tuple = ["Message Box", message ] 21: space.write( tuple ) 22: printf("mbox-writer: wrote[%s]\n",message) 23: end 24: 25: main(ARGV)
1: #!/usr/bin/env ruby 2: # mbox-reader.rb -- Read a message from a message box in a tuple space. 3: 4: require 'rinda/tuplespace' 5: 6: def usage() 7: $stderr.printf("Usage: %% %s uri\n", $0) 8: exit( 1 ) 9: end 10: 11: def main(argv) 12: if( argv.length != 1 ) 13: usage() 14: end 15: uri = argv[0] 16: DRb.start_service() 17: space = DRbObject.new_with_uri( uri ) 18: 19: template = ["Message Box",nil] 20: tuple = space.read( template ) 21: message = tuple[1] 22: p tuple # for debug 23: printf("mbox-reader: read [%s]\n", message ) 24: end 25: 26: main(ARGV)space.read()で、タプル空間からタプルを取り出す。 最初の引数は、テンプレートである。 read() は、空間からテンプレートとマッチするエントリを読み出す。
read() は、マッチするエントリがなければ、タイムアウトするまで待つ。 第2引数に秒単位で待ち時間を指定できる。
注意:連続する read() が同じオブジェクトを返す保証はない。
1: #!/usr/bin/env ruby 2: # mbox-taker.rb -- Take a message from a message box in a tuple space. 3: 4: require 'rinda/tuplespace' 5: 6: def usage() 7: $stderr.printf("Usage: %% %s uri\n", $0) 8: exit( 1 ) 9: end 10: 11: def main(argv) 12: if( argv.length != 1 ) 13: usage() 14: end 15: uri = argv[0] 16: DRb.start_service() 17: space = DRbObject.new_with_uri( uri ) 18: 19: template = ["Message Box",nil] 20: tuple = space.take( template ) 21: message = tuple[1] 22: p tuple # for debug 23: printf("mbox-taker: took [%s]\n", message ) 24: end 25: 26: main(ARGV)take() は、read() と同じだが、エントリを空間から取り去る所が異なる。複 数の take() が重なったとしても、エントリは1つにしか取られない。
% diff mbox-reader.rb mbox-taker.rb2c2 < # mbox-reader.rb -- Read a message from a message box in a tuple space. --- > # mbox-taker.rb -- Take a message from a message box in a tuple space. 20c20 < tuple = space.read( template ) --- > tuple = space.take( template ) 23c23 < printf("mbox-reader: read [%s]\n", message ) --- > printf("mbox-taker: took [%s]\n", message ) %
![]()
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-08/ex/make-space.rb実行には、Ruby の 1.8 以降が必要。drb.rb を含んだもの。% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-08/ex/mbox-writer.rb
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-08/ex/mbox-reader.rb
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-08/ex/mbox-taker.rb
% chmod +x *.rb
% ls -l
total 32 -rwxr-xr-x 1 yas yas 463 2 7 23:42 make-space.rb -rwxr-xr-x 1 yas yas 485 2 7 23:48 mbox-reader.rb -rwxr-xr-x 1 yas yas 484 2 8 00:06 mbox-taker.rb -rwxr-xr-x 1 yas yas 406 2 7 23:40 mbox-writer.rb % which ruby
/usr/local/bin/ruby % ruby -v
ruby 1.8.6 (2007-03-13 patchlevel 0) [universal-darwin8.0] %
![]()
% ./make-space.rb 1231引数のポート番号は、ぶつからないような番号にする。 自動的に終了しないので、実験が終わったら ^C (Control-C) で殺す。druby://iris:1231 Type ^C to stop this program. (最後に ^C で止める)
Writer を動作させる。
% ./mbox-writer.rb druby://iris:1231 hello最後のウインドウで Reader や Taker を動作させる。mbox-writer: wrote[hello] % ./mbox-writer.rb druby://iris:1231 hi
mbox-writer: wrote[hi] %
![]()
% ./mbox-reader.rb druby://iris:12313回目の take は止まる。別ウインドウで write すれば、先に進む。["Message Box", "hello"] mbox-reader: read [hello] % ./mbox-reader.rb druby://iris:1231
["Message Box", "hello"] mbox-reader: read [hello] % ./mbox-reader.rb druby://iris:1231
["Message Box", "hello"] mbox-reader: read [hello] % ./mbox-taker.rb druby://iris:1231
["Message Box", "hello"] mbox-taker: took [hello] % ./mbox-taker.rb druby://iris:1231
["Message Box", "hi"] mbox-taker: took [hi] % ./mbox-taker.rb druby://iris:1231
...
タプルがない状態先に take/read すると、止まる。この状態で、write すれ ば、read/take が終了する。
interface JavaSpace を実現したオブジェクト
Linda | JavaSpace | 説明 |
out | write | タプルをタプル空間内に生成する。 |
in | take | タプルを取り去る |
rd | read | in/takeと似ているが、タプルがタプル空間に残る。 |
write, read, take を使う部分のプログラムは簡単だが、space を利用可能に するのには苦労する。
次の2つのプログラムを作成する。
(1) 初期化プログラム String url = argv[0]; tuple = [url,0] space.write( tuple ); (2) アクセスされた時に動作するプログラム 値を増やす。現在の値を画面に表示する。