並行分散ソフトウェア/並列分散ソフトウェア 電子・情報工学系 新城 靖 <yas@is.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/sie/pdsoft-2005/2006-02-10
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/sie/
http://www.cs.tsukuba.ac.jp/~yas/
並列処理の最終結果に焦点を当る。
例: 家の建築:
例: 家の建築:
問題にあったものを使う。
実際の家の建築では、全部の方法が使われている。
うまく行く例:ベクトルの足し算: S[i] = A[i] + B[i]
M[][0] に最初の位置。
position(i,j), 繰り返し j での 物体 i の位置を計算する。
position(X,j-1) の終了を待つ。
プロセスがデータに化けて行く。
物体に対応したプロセスを作る。手順(ワーカの仕事):集合に含まれている全ての物体について、次の位置を 計算する。
マスタで、N 個の物体を作る。
ワーカを作る。ワーカの数は、N 個ではなくて、もっと少ない(CPU数と同じに する)。
物体の位置を分散データ構造体(共有メモリ)に置く。
解決1:ライブデータ構造体を、受動的な構造体に書き換える。プロセスを複数 の構造体に対応させる。
問題2:分散データ構造体で書いたプログラム(共有空間が必要)が、NORMA で うまく動かない。
解決:メッセージ・パッシングに変換する。
Carriero と Gelernter の主張:分散データ構造体がいい。
Java では、スレッド、RMI、Javaspaces の順に導入された。
モニタは、メッセージパッシングの仲間か分散データ構造体か。
タプルペースモデル(tupple space model)で分散データ構造体を支援。(メッ セージ・パッシングやライブデータ構造体的なプログラムも書ける。)
タプルは、型付きの値の並び。
タプルの例:
("a string", 10.01, 17, 10)
(0,1)
2種類のタプル
out("a string", 10.01, 17, x)
in("a string", ?f, ?i, y)
「?」付のものは、formal。型が同じものとマッチする。 ついていないものは、actual。型と値が同じものとマッチする。
eval("e", 7, exp(7))
in(), out() で同期がとられる。
タプルの形式: (name,val)
読込み: rd(name,?val) 変更: in(name,?val) val = ... val ... ; out(name,val)
P命令: in("sem-1") V命令: out("sem-1");同じタプルを out したら溜る。
注意:in() したデータは、1プロセスでしかアクセスされないので、セマフォ などによるロックは不要。
仕事を入れる: out("task",TaskDescription) 仕事を取り出す: in("task",?NewTask)
for( i=0 ; i<N; i++ ) { func(i,args); }並列:
for( i=0 ; i<N; i++ ) { eval ("loop-33", func(i,args) ); // プロセス生成 } for( i=0 ; i<N; i++ ) { in("loop-33", 1 ); // 待ち } func(i,args) { ... return( 1 ); }
n プロセスのバリア
初期化: out("barrier-37",n)
各プロセス: 1減らして、0になるのを待つ。 in("barrier-37",?val) out("barrier-37",val-1) rd("barrier-37",0)
A[10]; ("A",0,val00) ("A",1,val01) ("A",2,val02) ... ("A",9,val99)
A[10][10]; ("A",0,0,val00) ("A",0,1,val01) ("A",0,2,val02) ... ("A",9,9,val99)
ストリームデータ ("stream",0,val0) ("stream",1,val1) ("stream",2,val2) ... ポインタ ("stream","head",0) ("stream","tail",0) ストリームに要素を追加: int index; in("stream","tail",?index); out("stream","tail",index+1); out("stream",index,new_element); ストリームから要素を取り出す: int index; in("stream","head",?index); out("stream","head",index+1); in("stream",index,?element);これは、複数source、複数sink。
source、sinkが1つなら、head, tail をタプルスペースに置かなくてもよい。
head の代わりに、局所変数でアクセスする。
ストリームから要素を取り出す: int index=0 ; while( ... ) { rd("stream",index++,?element); }
("primes", 2, 1 ) // primes[2] = 1 ; ("primes", 3, 1 ) // primes[3] = 1 ; ("primes", 4, 0 ) // primes[4] = 0 ; ("primes", 5, 1 ) // primes[5] = 1 ; ("primes", 6, 0 ) // primes[6] = 0 ; ("primes", 7, 1 ) // primes[6] = 1 ; ..
プログラム
1: /* 2: * prime-results.c 3: */ 4: 5: #define LIMIT 1000 6: 7: main() 8: { 9: int count, i, ok; 10: for( i=2; i<=LIMIT; i++ ) 11: eval ("primes", i, is_prime(i) ); 12: count = 0 ; 13: for( i=2; i<=LIMIT; i++ ) 14: { 15: rd("primes", i, ?ok); 16: if( ok ) 17: count ++; 18: } 19: printf("%d.\n", count ); 20: } 21: 22: is_prime( int me ) 23: { 24: int i, limit, ok; 25: double sqrt(); 26: limit = (int) sqrt( (double)me ) + 1; 27: for( i=2; i<limit; i++ ) 28: { 29: rd("primes", i, ?ok); 30: if( ok && (me%i == 0) ) 31: return( 0 ); 32: } 33: return( 1 ); 34: }2 から sqrt(n) 以下の素数を rd() して得、割ってみて剰りを調べる。
利点
問題点
while( 1 ) { in("next task", start ); out("next task", start + GRAIN); start から start + GRAIN の間の素数を探す; }
("primes", 0, 2, 4 ) // primes[0] = 2 ; p2[0] = 4 ; ("primes", 1, 3, 9 ) // primes[1] = 3 ; p2[1] = 9 ; ("primes", 2, 5, 25 ) // primes[2] = 5 ; p2[2] = 25 ; ("primes", 3, 7, 49 ) // primes[3] = 7 ; p2[3] = 49 ;
("result", start, 1番目の固まり) ("result", start+GRAIN, 2番目の固まり) ("result", start+GRAIN*3, 3番目の固まり) ("result", start+GRAIN*4, 4番目の固まり)各「固まり」は、配列で、最後に終り -1 が入っている。 マスタは、この ("result",,) のストリームを読込み、("primes",,,) のスト リームを作る。
1: /* 2: * prime-agenda.c 3: */ 4: 5: #define LIMIT 1000 6: #define GRAIN 100 7: #define MAX 1000 8: 9: main( int argc, char *argv[], char *envp[] ) 10: { 11: int eot, first_num, i, length, new_primes[GRAIN], np2; 12: int num, num_primes, num_workers, primes[MAX], p2[MAX]; 13: num_workers = atoi( argv[1] ); 14: for( i=0; i< num_workers; i++ ) 15: eval ("worker", worker() ); 16: 17: num_primes = init_primes( primes, p2 ); 18: first_num = primes[num_primes-1] + 2; 19: out("next task", first_num); 20: 21: eot = 0 ; /* end of table */ 22: for( num=first_num; num<LIMIT; num+=GRAIN) 23: { 24: in("result", num, ?new_primes:length ); 25: for( i=0; i<length; i++, num_primes++) 26: { 27: primes[num_primes] = new_primes[i]; 28: if( !eot ) 29: { 30: np2 = new_primes[i] * new_primes[i] ; 31: if( np2 > LIMIT ) 32: { 33: eot = 1 ; 34: np2 = -1 ; 35: } 36: out("primes", num_primes, new_primes[i], np2 ); 37: } 38: } 39: } 40: for( i=0; i<num_workers; i++) 41: in("worker", ?int); 42: printf("%d.\n", num_primes ); 43: } 44: 45: worker() 46: { 47: int count, eot, i, limit, num, num_primes, ok, start; 48: int my_primes[GRAIN], primes[MAX], p2[MAX]; 49: num_primes = init_primes( primes, p2 ); 50: 51: eot = 0; 52: while( 1 ) 53: { 54: in("next task", ?num ); 55: if( num == -1 ) 56: { 57: out("next task", -1 ); 58: return; 59: } 60: limit = num + GRAIN; 61: out("next task", (limit>LIMIT) ? -1 : limit ); 62: if( limit > LIMIT ) 63: limit = LIMIT ; 64: 65: start = num; 66: for( count=0; num<limit; num+=2 ) 67: { 68: while( !eot && num > p2[num_primes-1] ) 69: { 70: rd("primes", num_primes, 71: ?primes[num_primes], ?p2[num_primes] ); 72: if( p2[num_primes] < 0 ) 73: eot = 1 ; 74: else 75: num_primes++; 76: } 77: for( i=0, ok=1; i<num_primes; i++ ) 78: { 79: if( num%primes[i] == 0 ) 80: { 81: ok = 0; 82: break; 83: } 84: if( num < p2[i] ) 85: break; 86: } 87: if( ok ) 88: my_primes[count++] = num ; 89: } 90: out("result", start, my_primes:count ); 91: } 92: } 93: 94: int 95: init_prime( int primes[MAX], int p2[MAX] ) 96: { 97: primes[0] = 2; p2[0] = 2 * 2; 98: primes[1] = 3; p2[1] = 3 * 3; 99: return( 2 ); 100: }
source | pipe_seg-3 | pipe_seg-5 | pipe_seg-7 | .... | sink
("seg", 宛先, ストリームのインデックス, 整数)たとえば、source の出力(主に pipe_seg-3の入力)は、次のようになる。
("seg", 3, 0, 5 ) // sink が in ("seg", 3, 1, 7 ) // 以下 pipe_seg-3 が in する ("seg", 3, 2, 9 ) ("seg", 3, 3, 11 ) ("seg", 3, 3, 13 ) ...pipe_seg-3 の出力は、次のようになる。
("seg", 5, 0, 7 ) // sink が in する ("seg", 5, 1, 11 ) // 以下 pipe_seg-5 が in する ("seg", 5, 2, 13 ) ...最終的には、次のようなデータがタプルスペースに残される。
("source", 1, 2) ("pipe seg", 2, 3) ("pipe seg", 3, 5) ("pipe seg", 4, 7) ... ("sink", MaxIndex, MaxPrime)
この方法は、結果並列や手順並列法よりも、並列性が低い。
1: /* 2: * prime-specialists.c 3: */ 4: 5: #define LIMIT 1000 6: 7: main() 8: { 9: eval ("soruce", source()); 10: eval ("sink", sink()); 11: } 12: 13: source() 14: { 15: int i, out_index=0; 16: for( i=0; i<LIMIT; i+= 2) 17: out("seg", 3, out_index++, i ); 18: out("seg", 3, out_index, 0 ); 19: } 20: 21: sink() 22: { 23: int in_index=0, num, prime=3, prime_count=2; 24: while( 1 ) 25: { 26: in("seg", prime, in_index++, ?num); 27: if( num == 0 ) 28: break; 29: if( num % prime != 0 ) 30: { 31: prime_count ++ ; 32: if( num * num < LIMIT ) 33: { 34: eval ("pipe seg", pipe_seg(prime, num, in_index)); 35: prime = num; 36: in_index = 0; 37: } 38: } 39: } 40: printf("count: %d\n", prime_count ); 41: } 42: 43: pipe_seg( int prime, int next, int in_index ) 44: { 45: int num, out_index=0; 46: while( 1 ) 47: { 48: in("seg", prime, in_index++, ?num ); 49: if( num == 0 ) 50: break; 51: if( num % prime != 0 ) 52: out("seg", next, out_index++, num); 53: } 54: out("seg", next, out_index, num ); 55: }
interface JavaSpace を実現したオブジェクト
---------------------------------------------------------------------- Linda JavaSpace 説明 ---------------------------------------------------------------------- out write タプルをタプル空間内に生成する。 in take タプルを取り去る rd read in/takeと似ているが、タプルがタプルスペースに残る。 ----------------------------------------------------------------------ライブタプル(eval命令)は、ない。
JavaSpaces が提供する空間の特徴
net/jini/core/entry/Entry.java: public interface Entry extends java.io.Serializable { }
net/jini/space/JavaSpace.java: public interface JavaSpace { Lease write(Entry entry, Transaction txn, long lease) throws TransactionException, RemoteException; long NO_WAIT = 0; Entry read(Entry tmpl, Transaction txn, long timeout) throws UnusableEntryException, TransactionException, InterruptedException, RemoteException; Entry readIfExists(Entry tmpl, Transaction txn, long timeout) throws UnusableEntryException, TransactionException, InterruptedException, RemoteException; Entry take(Entry tmpl, Transaction txn, long timeout) throws UnusableEntryException, TransactionException, InterruptedException, RemoteException; Entry takeIfExists(Entry tmpl, Transaction txn, long timeout) throws UnusableEntryException, TransactionException, InterruptedException, RemoteException; EventRegistration notify(Entry tmpl, Transaction txn, RemoteEventListener listener, long lease, MarshalledObject handback) throws TransactionException, RemoteException; Entry snapshot(Entry e) throws RemoteException; }
notify() では、マッチするテンプレートが write された時、 RemoteEventListener の notify(RemoteEvent theEvent) が呼ばれる。
snapshot() では、エントリのスナップショットが返される。元のエントリが 更新されても、変化しない。read(), take() のテンプレート用。
public class PDSoftSpaceFinder extends java.lang.Object { public PDSoftSpaceFinder(); public static net.jini.space.JavaSpace getSpace(java.lang.String); public static net.jini.space.JavaSpace getSpace(); }
1: // From the JavaSpaces book 2: //package jsbook.chapter1.helloWorld; 3: 4: import net.jini.core.entry.Entry; 5: 6: public class Message implements Entry { 7: public String content; 8: 9: public Message() { 10: } 11: }
1: // 2: // HelloWriter.java 3: // 4: 5: import net.jini.space.JavaSpace; 6: 7: class HelloWriter { 8: public static void main(String[] argv) 9: { 10: JavaSpace space = PDSoftSpaceFinder.getSpace(); 11: if( space == null ) 12: { 13: System.err.println("No JavaSpace found."); 14: System.exit( 1 ); 15: } 16: Message msg = new Message(); 17: msg.content = "Hello" ; 18: try 19: { 20: space.write(msg, null, net.jini.core.lease.Lease.FOREVER); 21: System.out.println("HelloWriter: wrote["+msg.content+"]"); 22: } 23: catch( Exception e ) 24: { 25: System.err.println("JavaSpace write error "+e.getMessage()); 26: e.printStackTrace(); 27: System.exit( -1 ); 28: } 29: System.exit( 0 ); 30: } 31: }
Lease write(Entry entry, Transaction txn, long lease) throws TransactionException, RemoteException;トランザクションは、複数の操作をグループ化するもの。null を 指定すれば、その機能は使われない。
lease は、時間を指定する。その時間だけは、空間がそのエントリを記憶して いる。空間が Garbage Collection に使う。Lease.FOREVER は、無限に覚えて いることを意味する。単位は、ミリ秒。
(transient-outrigger.jar では、サーバを落とすと消える。)
1: // 2: // HelloReader.java 3: // 4: 5: import net.jini.space.JavaSpace; 6: 7: public class HelloReader { 8: public static void main(String[] argv) 9: { 10: JavaSpace space = PDSoftSpaceFinder.getSpace(); 11: if( space == null ) 12: { 13: System.err.println("No JavaSpace found."); 14: System.exit( 1 ); 15: } 16: 17: Message template = new Message(); 18: Message result; 19: try 20: { 21: result = (Message)space.read(template, null, Long.MAX_VALUE); 22: System.out.println("HelloReader: read ["+result.content+"]"); 23: } 24: catch( Exception e ) 25: { 26: System.err.println("JavaSpace read error "+e.getMessage()); 27: e.printStackTrace(); 28: System.exit( -1 ); 29: } 30: 31: System.exit( 0 ); 32: } 33: }
Entry read(Entry tmpl, Transaction txn, long timeout) throws UnusableEntryException, TransactionException, InterruptedException, RemoteException;read の最初の引数は、テンプレートである。 read は、空間からテンプレートとマッチするエントリを読み出す。
次の2つの規則を満たした時に、テンプレートとエントリはマッチする
public class Vegetable implements Entry { } public class Fruit implements Entry { } public class Apple extends Fruit { } public class Orange extends Fruit { }
「同じ値」は、serialize してバイトレベルで同じという意味。エントリは、 空間にある時にはserialize された形で保存されている。
null をワイルド・カードに使う問題点は、「本当に null の値を持つエントリ 探す」ということができないこと。 区別したい時には、Boolean を添える。
public class NotePtr implements Entry { public Boolean ptrIsNull; public Node ptr; } ... NodePtr template = new NodePtr(); template.ptrIsNull = new Boolean(true); template.ptr = null; // for completeness; null by defaultnull をワイルドカードに使ったので、空間に置くエントリのフィールドは、 オブジェクトにする。int, boolean, float, double などは、Integer, Boolean, Float, Double などの wrappe クラスを使う。
read() は、マッチするエントリがなければ、timeout するまで待つ。 Long.MAX_VALUE は、無限に待つことを意味する。待ちたくない時には、 JavaSpaces.NO_WAIT を使うか、raedIfExists() を使う。
注意:連続する read() が同じオブジェクトを返す保証はない。
1: // 2: // HelloTaker.java 3: // 4: 5: import net.jini.space.JavaSpace; 6: 7: public class HelloTaker { 8: public static void main(String[] argv) 9: { 10: JavaSpace space = PDSoftSpaceFinder.getSpace(); 11: if( space == null ) 12: { 13: System.err.println("No JavaSpace found."); 14: System.exit( 1 ); 15: } 16: 17: Message template = new Message(); 18: Message result; 19: try 20: { 21: result = (Message)space.take(template, null, Long.MAX_VALUE); 22: System.out.println("HelloTaker: took ["+result.content+"]"); 23: } 24: catch( Exception e ) 25: { 26: System.err.println("JavaSpace read error "+e.getMessage()); 27: e.printStackTrace(); 28: System.exit( -1 ); 29: } 30: 31: System.exit( 0 ); 32: } 33: } % diff HelloReader.npr HelloTaker.npr 2c2 < 2: // HelloReader.java --- > 2: // HelloTaker.java 7c7 < 7: public class HelloReader { --- > 7: public class HelloTaker { 21,22c21,22 < 21: result = (Message)space.read(template, null, Long.MAX_VALUE); < 22: System.out.println("HelloReader: read ["+result.content+"]"); --- > 21: result = (Message)space.take(template, null, Long.MAX_VALUE); > 22: System.out.println("HelloTaker: took ["+result.content+"]"); %take() は、read() と同じだが、エントリを空間から取り去る所が異なる。 複数の take() が重なったとしても、 エントリは1つにしか取られない。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2006-02-10/ex/PDSoftSpaceFinder.java% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2006-02-10/ex/Message.java
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2006-02-10/ex/HelloWriter.java
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2006-02-10/ex/HelloReader.java
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2006-02-10/ex/HelloTaker.java
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/cdsoft-2005/2006-02-10/ex/Makefile
% make hello
javac -classpath .:/home/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/lib/transient-outrigger.jar PDSoftSpaceFinder.java Note: PDSoftSpaceFinder.java uses or overrides a deprecated API. Note: Recompile with -deprecation for details. javac -classpath .:/home/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/lib/transient-outrigger.jar Message.java javac -classpath .:/home/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/lib/transient-outrigger.jar HelloWriter.java javac -classpath .:/home/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/lib/transient-outrigger.jar HelloReader.java javac -classpath .:/home/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/lib/transient-outrigger.jar HelloTaker.java %
![]()
% make run-rmiregistryrmiregistry で使うポート番号は、ぶつからないような番号を port に設定して使うとよい。 rmiregistry は自動的に終了しないので、実験が終わったら ^C (Control-C) で殺す。rmiregistry 8080 (最後に ^C で止める)
% make run-TransientSpace残りの一方のウインドウで Writer を動作させる。java -Djava.security.policy=/home/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/policy/policy.transient-outrigger -Djava.rmi.server.codebase=file:/home/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/lib/outrigger-dl.jar -classpath .:/home/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/lib/transient-outrigger.jar -Dcom.sun.jini.use.registry=yes -Dcom.sun.jini.outrigger.spaceName=JavaSpace -Dcom.sun.jini.rmiRegistryPort=8080 -jar /home/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/lib/transient-outrigger.jar Warning: file url in codebase component may cause problems (file:/home/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/lib/outrigger-dl.jar) Warning: Your are using the com.sun.jini.use.registry property in order to bind this Outrigger server into an RMI registry. Direct support for binding Outrigger servers into RMI registries is going to be removed in a future version of the Jini(TM) Software Kit (JSK). (最後に ^C で止める)
% set prompt="Writer% "最後のウインドウで Reader や Taker を動作させる。Writer% make run-HelloWriter HelloWriter: wrote[Hello] Writer% make run-HelloWriter HelloWriter: wrote[Hello] Writer%
% make -n run-HelloReader) 3回目の take は止まる。別ウインドウで write すれば、先に進む。java -Djava.security.policy=/home/lab/Denjo/yas/sie/cdsoft-2005/jini1_ 1/policy/policy.transient-outrigger -Djava.rmi.server.codebase=file:/h ome/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/lib/outrigger-dl.jar -classp ath .:/home/lab/Denjo/yas/sie/cdsoft-2005/jini1_1/lib/transient-outrig ger.jar -Dcom.sun.jini.use.registry=yes -Dcom.sun.jini.outrigger.space Name=JavaSpace -Dcom.sun.jini.rmiRegistryPort=8080 HelloReader % make run-HelloReader
HelloReader: read [Hello] % make run-HelloReader
HelloReader: read [Hello] % make run-HelloReader
HelloReader: read [Hello] % make run-HelloReader
HelloReader: read [Hello] %
% make run-HelloTaker
HelloTaker: took [Hello] % make run-HelloTaker
HelloTaker: took [Hello] % make run-HelloTaker
^Cmake: *** [run-HelloTaker] Error 130 %
![]()
タプルがない状態先に take/read すると、止まる。この状態で、write すれ ば、read/take が終了する。
read() でのマッチングは、serialize された形で比較される。
空間に入れて出すと、オブジェクトが増えることがある。
public class E1 implements Entry { public Integer obj1; public Integer obj2; } Integer obj = Integer(0); E1 e = new E1(); e.obj1 =obj; e.obj2 =obj;
引数なしのコンストラクタが必要である。
read(), take() でのエントリの復元
read(), take() のテンプレートがループの中で不変な場合、 snapshot() を作ると効率が良くなる。
古い (jini1_1) の policy/policy.transient-outrigger は、弱い。