並行システム
システム情報工学研究科コンピュータサイエンス専攻、電子・情報工学系
新城 靖
<yas@is.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-01-29
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/sie/
http://www.cs.tsukuba.ac.jp/~yas/
http://www2a.biglobe.ne.jp/~seki/ruby/d208.html,
関 将俊氏による Rinda の解説。本のサンプルコードと説明含む。
並列処理の最終結果に焦点を当る。
例: 家の建築:
例: 家の建築:
問題にあったものを使う。
実際の家の建築では、全部の方法が使われている。
うまく行く例:ベクトルの足し算: S[i] = A[i] + B[i]
M[][0] に最初の位置。
position(i,j), 繰り返し j での 物体 i の位置を計算する。
position(X,j-1) の終了を待つ。
プロセスがデータに化けて行く。
物体に対応したプロセスを作る。手順(ワーカの仕事):集合に含まれている全ての物体について、次の位置を 計算する。
マスタで、N 個の物体を作る。
ワーカを作る。ワーカの数は、N 個ではなくて、もっと少ない(CPU数と同じに する)。
物体の位置を分散データ構造体(共有メモリ)に置く。

図? 手法の変換
解決1:ライブデータ構造体を、受動的な構造体に書き換える。プロセスを複数 の構造体に対応させる。
問題2:分散データ構造体で書いたプログラム(共有空間が必要)が、NORMA で うまく動かない。
解決:メッセージ・パッシングに変換する。
Carriero と Gelernter の主張:分散データ構造体がいい。
Java では、スレッド、RMI、Javaspaces の順に導入された。
モニタは、メッセージパッシングの仲間か分散データ構造体か。
タプルペースモデル(tuple space model)で分散データ構造体を支援。(メッ セージ・パッシングやライブデータ構造体的なプログラムも書ける。)
タプルは、型付きの値の並び。
タプルの例:
("a string", 10.01, 17, 10)
(0,1)
2種類のタプル
out("a string", 10.01, 17, x)
in("a string", ?f, ?i, y)
「?」付のものは、formal。型が同じものとマッチする。 ついていないものは、actual。型と値が同じものとマッチする。
eval("e", 7, exp(7))
新しくプロセス(スレッド)が作られ、exp(7) を計算しはじめる。プロセスが
終了すると、最終結果は、値に変わり、out() されたのと同じになる。
タプルの形式: (name,val)
読込み:
rd(name,?val)
変更:
in(name,?val)
val = val + 1 ;
out(name,val)
P命令:
in("sem-1")
V命令:
out("sem-1");
同じタプルを out したら溜る。
注意:in() したデータは、1プロセスでしかアクセスされないので、セマフォ などによるロックは不要。
仕事を入れる:
out("task",TaskDescription)
仕事を取り出す:
in("task",?NewTask)
for( i=0 ; i<N; i++ )
{
func(i,args);
}
並列:
for( i=0 ; i<N; i++ )
{
eval ("loop-33", func(i,args) ); // プロセス生成
}
for( i=0 ; i<N; i++ )
{
in("loop-33", 1 ); // 待ち
}
func(i,args)
{
...
return( 1 );
}
マルチスレッドの マスタ・スレーブ(バリア付き) 参照。
n プロセスのバリア
初期化:
out("barrier-37",n)
各プロセス: 1減らして、0になるのを待つ。
in("barrier-37",?val)
out("barrier-37",val-1)
rd("barrier-37",0)
A[10];
("A",0,val00)
("A",1,val01)
("A",2,val02)
...
("A",9,val99)
A[10][10];
("A",0,0,val00)
("A",0,1,val01)
("A",0,2,val02)
...
("A",9,9,val99)
ストリームデータ
("stream",0,val0)
("stream",1,val1)
("stream",2,val2)
...
ポインタ
("stream","head",0)
("stream","tail",0)
ストリームに要素を追加:
int index; // ローカル変数
in("stream","tail",?index);
out("stream","tail",index+1);
out("stream",index,new_element);
ストリームから要素を取り出す:
int index; // ローカル変数
in("stream","head",?index);
out("stream","head",index+1);
in("stream",index,?element);
これは、複数source、複数sink。
source、sinkが1つなら、head, tail をタプル空間に置かなくてもよい。
head の代わりに、局所変数でアクセスする。
ストリームから要素を取り出す:
int index=0 ;
while( ... )
{
rd("stream",index++,?element);
}
rd() しかなされず、out() する人がいないので、タプル空間にストリームのデー
タが残ってしまう。
interface JavaSpace を実現したオブジェクト
| Linda | JavaSpace | 説明 |
| out | write | タプルをタプル空間内に生成する。 |
| in | take | タプルを取り去る |
| rd | read | in/takeと似ているが、タプルがタプル空間に残る。 |
write, read, take を使う部分のプログラムは簡単だが、space を利用可能に するのには苦労する。
JavaSpaces が提供する空間の特徴
net/jini/core/entry/Entry.java:
public interface Entry extends java.io.Serializable {
}
net/jini/space/JavaSpace.java:
public interface JavaSpace {
Lease write(Entry entry, Transaction txn, long lease)
throws TransactionException, RemoteException;
long NO_WAIT = 0;
Entry read(Entry tmpl, Transaction txn, long timeout)
throws UnusableEntryException, TransactionException,
InterruptedException, RemoteException;
Entry readIfExists(Entry tmpl, Transaction txn, long timeout)
throws UnusableEntryException, TransactionException,
InterruptedException, RemoteException;
Entry take(Entry tmpl, Transaction txn, long timeout)
throws UnusableEntryException, TransactionException,
InterruptedException, RemoteException;
Entry takeIfExists(Entry tmpl, Transaction txn, long timeout)
throws UnusableEntryException, TransactionException,
InterruptedException, RemoteException;
EventRegistration
notify(Entry tmpl, Transaction txn, RemoteEventListener listener,
long lease, MarshalledObject handback)
throws TransactionException, RemoteException;
Entry snapshot(Entry e) throws RemoteException;
}
notify() では、マッチするテンプレートが write された時、 RemoteEventListener の notify(RemoteEvent theEvent) が呼ばれる。
snapshot() では、エントリのスナップショットが返される。元のエントリが 更新されても、変化しない。read(), take() のテンプレート用。
次のチュートリアルが参考になる。
[1] Qusay H. Mamoud: "Getting Started With JavaSpaces Technology: Beyond
Conventional Distributed Programming Paradigms",
July 12, 2005.
http://java.sun.com/developer/technicalArticles/tools/JavaSpaces/
[2] The Blitz Project: "Blitz HelloWorld Example".
http://www.dancres.org/bjspj/docs/docs/hello_example.html
両方とも、後者のサイトにある Lookup.java を使っている。
[3] "Jini Technology Starter Kit v2.1", 2005.
https://starterkit.dev.java.net/downloads/index.html
このキットには、Launch-All というプログラムが含まれている。(Jiniがイン ストールされたディレクトリ下のinstallverifyディレクトリにある)。以下の JavaSpaces の例題を実行する前に、このプログラムを実行する必要がある。
1: // From the JavaSpaces book
2: //package jsbook.chapter1.helloWorld;
3:
4: import net.jini.core.entry.Entry;
5:
6: public class Message implements Entry {
7: public String content;
8:
9: public Message() {
10: }
11: }
1: //
2: // HelloWriter.java
3: //
4:
5: import net.jini.space.JavaSpace;
6:
7: class HelloWriter {
8: public static void main(String[] argv)
9: {
10: Lookup finder = new Lookup(JavaSpace.class);
11: JavaSpace space = (JavaSpace) finder.getService();
12: if( space == null )
13: {
14: System.err.println("No JavaSpace found.");
15: System.exit( 1 );
16: }
17: Message msg = new Message();
18: msg.content = "Hello" ;
19: try
20: {
21: space.write(msg, null, net.jini.core.lease.Lease.FOREVER);
22: System.out.println("HelloWriter: wrote["+msg.content+"]");
23: }
24: catch( Exception e )
25: {
26: System.err.println("JavaSpace write error "+e.getMessage());
27: e.printStackTrace();
28: System.exit( -1 );
29: }
30: System.exit( 0 );
31: }
32: }
Lease write(Entry entry, Transaction txn, long lease)
throws TransactionException, RemoteException;
トランザクションは、複数の操作をグループ化するもの。null を
指定すれば、その機能は使われない。
lease は、時間を指定する。その時間だけは、空間がそのエントリを記憶して いる。空間が Garbage Collection に使う。Lease.FOREVER は、無限に覚えて いることを意味する。単位は、ミリ秒。
1: //
2: // HelloReader.java
3: //
4:
5: import net.jini.space.JavaSpace;
6:
7: public class HelloReader {
8: public static void main(String[] argv)
9: {
10: Lookup finder = new Lookup(JavaSpace.class);
11: JavaSpace space = (JavaSpace) finder.getService();
12: if( space == null )
13: {
14: System.err.println("No JavaSpace found.");
15: System.exit( 1 );
16: }
17:
18: Message template = new Message();
19: Message result;
20: try
21: {
22: result = (Message)space.read(template, null, Long.MAX_VALUE);
23: System.out.println("HelloReader: read ["+result.content+"]");
24: }
25: catch( Exception e )
26: {
27: System.err.println("JavaSpace read error "+e.getMessage());
28: e.printStackTrace();
29: System.exit( -1 );
30: }
31:
32: System.exit( 0 );
33: }
34: }
Entry read(Entry tmpl, Transaction txn, long timeout)
throws UnusableEntryException, TransactionException,
InterruptedException, RemoteException;
read の最初の引数は、テンプレートである。
read は、空間からテンプレートとマッチするエントリを読み出す。
次の2つの規則を満たした時に、テンプレートとエントリはマッチする
public class Vegetable implements Entry {
}
public class Fruit implements Entry {
}
public class Apple extends Fruit {
}
public class Orange extends Fruit {
}
「同じ値」は、serialize してバイトレベルで同じという意味。エントリは、 空間にある時にはserialize された形で保存されている。
null をワイルド・カードに使う問題点は、「本当に null の値を持つエントリ 探す」ということができないこと。 区別したい時には、Boolean を添える。
public class NotePtr implements Entry {
public Boolean ptrIsNull;
public Node ptr;
}
...
NodePtr template = new NodePtr();
template.ptrIsNull = new Boolean(true);
template.ptr = null; // for completeness; null by default
null をワイルドカードに使ったので、空間に置くエントリのフィールドは、
オブジェクトにする。int, boolean, float, double などは、Integer,
Boolean, Float, Double などの wrappe クラスを使う。
read() は、マッチするエントリがなければ、timeout するまで待つ。 Long.MAX_VALUE は、無限に待つことを意味する。待ちたくない時には、 JavaSpaces.NO_WAIT を使うか、raedIfExists() を使う。
注意:連続する read() が同じオブジェクトを返す保証はない。
1: //
2: // HelloTaker.java
3: //
4:
5: import net.jini.space.JavaSpace;
6:
7: public class HelloTaker {
8: public static void main(String[] argv)
9: {
10: Lookup finder = new Lookup(JavaSpace.class);
11: JavaSpace space = (JavaSpace) finder.getService();
12: if( space == null )
13: {
14: System.err.println("No JavaSpace found.");
15: System.exit( 1 );
16: }
17:
18: Message template = new Message();
19: Message result;
20: try
21: {
22: result = (Message)space.take(template, null, Long.MAX_VALUE);
23: System.out.println("HelloTaker: took ["+result.content+"]");
24: }
25: catch( Exception e )
26: {
27: System.err.println("JavaSpace read error "+e.getMessage());
28: e.printStackTrace();
29: System.exit( -1 );
30: }
31:
32: System.exit( 0 );
33: }
34: }
% diff HelloReader.java HelloTaker.java
2c2
< // HelloReader.java
---
> // HelloTaker.java
7c7
< public class HelloReader {
---
> public class HelloTaker {
22,23c22,23
< result = (Message)space.read(template, null, Long.MAX_VALUE);
< System.out.println("HelloReader: read ["+result.content+"]");
---
> result = (Message)space.take(template, null, Long.MAX_VALUE);
> System.out.println("HelloTaker: took ["+result.content+"]");
%
take() は、read() と同じだが、エントリを空間から取り去る所が異なる。
複数の take() が重なったとしても、
エントリは1つにしか取られない。
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-01-29/ex/Lookup.java
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-01-29/ex/Message.java
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-01-29/ex/HelloWriter.java
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-01-29/ex/HelloReader.java
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-01-29/ex/HelloTaker.java
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-01-29/ex/Makefile
% emacs Makefile (jinilibを、自分の環境に合わせて書き換える)
% make hello
javac -classpath .:(省略)/jini-ext.jar:(省略)/reggie.jar:(省略)/outrigger.jar Lookup.java
javac -classpath .:(省略)/jini-ext.jar:(省略)/reggie.jar:(省略)/outrigger.jar Message.java
javac -classpath .:(省略)/jini-ext.jar:(省略)/reggie.jar:(省略)/outrigger.jar HelloWriter.java
javac -classpath .:(省略)/jini-ext.jar:(省略)/reggie.jar:(省略)/outrigger.jar HelloReader.java
javac -classpath .:(省略)/jini-ext.jar:(省略)/reggie.jar:(省略)/outrigger.jar HelloTaker.java
% ls
HelloReader.class HelloWriter.class Makefile
HelloReader.java HelloWriter.java Message.class
HelloTaker.class Lookup.class Message.java
HelloTaker.java Lookup.java
%
ウインドウを2枚開く。
一方のウインドウで Writer を動作させる。
% set prompt="Writer% "
Writer% make run-HelloWriter
HelloWriter: wrote[Hello]
Writer%
もう一方のウインドウで Reader や Taker を動作させる。
% make run-HelloReader
HelloReader: read [Hello]
% make run-HelloReader
HelloReader: read [Hello]
% make run-HelloReader
HelloReader: read [Hello]
%
% make run-HelloTaker
HelloTaker: took [Hello]
% make run-HelloTaker
HelloTaker: took [Hello]
% make run-HelloTaker
(固まる)
タプルがない状態先に take/read すると、止まる。この状態で、write すれば、
read/take は先に進む。
Writer% make run-HelloWriter
HelloWriter: wrote[Hello]
Writer%
read() でのマッチングは、serialize された形で比較される。
空間に入れて出すと、オブジェクトが増えることがある。
public class E1 implements Entry {
public Integer obj1;
public Integer obj2;
}
Integer obj = Integer(0);
E1 e = new E1();
e.obj1 =obj;
e.obj2 =obj;
引数なしのコンストラクタが必要である。
read(), take() でのエントリの復元
read(), take() のテンプレートがループの中で不変な場合、 snapshot() を作ると効率が良くなる。
| Linda | Rinda | 説明 |
| タプル() | 配列[] | タプル空間に置くことができるデータ構造 |
| out | write | タプルをタプル空間内に生成する。 |
| in | take | タプルを取り去る |
| rd | read | in/takeと似ているが、タプルがタプル空間に残る。 |
Rinda が提供する空間の特徴
=== で等しい。
% ruby -e 'p 1 === 1'
true
% ruby -e 'p Integer === 1'
true
% ruby -e 'p String === 1'
false
% ruby -e 'p /[abc]xy/ === "axy"'
true
% ruby -e 'p /[abc]xy/ === "Axy"'
false
%
["Message Box", "Hello" ]
1: #!/usr/bin/env ruby
2: # make-space.rb -- Make a tuple space and print its URI
3:
4: require 'rinda/tuplespace'
5:
6: def usage()
7: $stderr.printf("Usage: %% %s portno\n", $0)
8: exit( 1 )
9: end
10:
11: def main(argv)
12: if( argv.length != 1 )
13: usage()
14: end
15: portno = argv[0]
16: space = Rinda::TupleSpace.new()
17: DRb.start_service("druby://:"+portno, space)
18: uri = DRb.uri()
19: $stdout.printf("%s\n",uri)
20: $stdout.printf("Type ^C to stop this program.\n")
21: DRb.thread.join()
22: end
23:
24: main(ARGV)
1: #!/usr/bin/env ruby
2: # mbox-writer.rb -- Write a message to the message box in a tupple space.
3:
4: require 'rinda/tuplespace'
5:
6: def usage()
7: $stderr.printf("Usage: %% %s uri message\n", $0)
8: exit( 1 )
9: end
10:
11: def main(argv)
12: if( argv.length != 2 )
13: usage()
14: end
15: uri = argv[0]
16: message = argv[1]
17:
18: space = DRbObject.new_with_uri( uri )
19:
20: tuple = ["Message Box", message ]
21: space.write( tuple )
22: printf("mbox-writer: wrote[%s]\n",message)
23: end
24:
25: main(ARGV)
1: #!/usr/bin/env ruby
2: # mbox-reader.rb -- Read a message from a message box in a tuple space.
3:
4: require 'rinda/tuplespace'
5:
6: def usage()
7: $stderr.printf("Usage: %% %s uri\n", $0)
8: exit( 1 )
9: end
10:
11: def main(argv)
12: if( argv.length != 1 )
13: usage()
14: end
15: uri = argv[0]
16: DRb.start_service()
17: space = DRbObject.new_with_uri( uri )
18:
19: template = ["Message Box",nil]
20: tuple = space.read( template )
21: message = tuple[1]
22: p tuple # for debug
23: printf("mbox-reader: read [%s]\n", message )
24: end
25:
26: main(ARGV)
space.read()で、タプル空間からタプルを取り出す。
最初の引数は、テンプレートである。
read() は、空間からテンプレートとマッチするエントリを読み出す。
read() は、マッチするエントリがなければ、タイムアウトするまで待つ。 第2引数に秒単位で待ち時間を指定できる。
注意:連続する read() が同じオブジェクトを返す保証はない。
1: #!/usr/bin/env ruby
2: # mbox-taker.rb -- Take a message from a message box in a tuple space.
3:
4: require 'rinda/tuplespace'
5:
6: def usage()
7: $stderr.printf("Usage: %% %s uri\n", $0)
8: exit( 1 )
9: end
10:
11: def main(argv)
12: if( argv.length != 1 )
13: usage()
14: end
15: uri = argv[0]
16: DRb.start_service()
17: space = DRbObject.new_with_uri( uri )
18:
19: template = ["Message Box",nil]
20: tuple = space.take( template )
21: message = tuple[1]
22: p tuple # for debug
23: printf("mbox-taker: took [%s]\n", message )
24: end
25:
26: main(ARGV)
take() は、read() と同じだが、エントリを空間から取り去る所が異なる。複
数の take() が重なったとしても、エントリは1つにしか取られない。
% diff mbox-reader.rb mbox-taker.rb
2c2
< # mbox-reader.rb -- Read a message from a message box in a tuple space.
---
> # mbox-taker.rb -- Take a message from a message box in a tuple space.
20c20
< tuple = space.read( template )
---
> tuple = space.take( template )
23c23
< printf("mbox-reader: read [%s]\n", message )
---
> printf("mbox-taker: took [%s]\n", message )
%
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-01-29/ex/make-space.rb
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-01-29/ex/mbox-writer.rb
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-01-29/ex/mbox-reader.rb
% wget http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-01-29/ex/mbox-taker.rb
% chmod +x *.rb
% ls -l
total 32
-rwxr-xr-x 1 yas yas 463 2 7 23:42 make-space.rb
-rwxr-xr-x 1 yas yas 485 2 7 23:48 mbox-reader.rb
-rwxr-xr-x 1 yas yas 484 2 8 00:06 mbox-taker.rb
-rwxr-xr-x 1 yas yas 406 2 7 23:40 mbox-writer.rb
% which ruby
/usr/local/bin/ruby
% ruby -v
ruby 1.8.6 (2008-03-03 patchlevel 114) [powerpc-darwin8.11.0]
%
実行には、Ruby の 1.8 以降が必要。drb.rb を含んだもの。
% ./make-space.rb 1231
druby://iris:1231
Type ^C to stop this program.
(最後に ^C で止める)
引数のポート番号は、ぶつからないような番号にする。
自動的に終了しないので、実験が終わったら ^C (Control-C) で殺す。
Writer を動作させる。
% ./mbox-writer.rb druby://iris:1231 hello
mbox-writer: wrote[hello]
% ./mbox-writer.rb druby://iris:1231 hi
mbox-writer: wrote[hi]
%
最後のウインドウで Reader や Taker を動作させる。
% ./mbox-reader.rb druby://iris:1231
["Message Box", "hello"]
mbox-reader: read [hello]
% ./mbox-reader.rb druby://iris:1231
["Message Box", "hello"]
mbox-reader: read [hello]
% ./mbox-reader.rb druby://iris:1231
["Message Box", "hello"]
mbox-reader: read [hello]
% ./mbox-taker.rb druby://iris:1231
["Message Box", "hello"]
mbox-taker: took [hello]
% ./mbox-taker.rb druby://iris:1231
["Message Box", "hi"]
mbox-taker: took [hi]
% ./mbox-taker.rb druby://iris:1231
...
3回目の take は止まる。別ウインドウで write すれば、先に進む。
タプルがない状態先に take/read すると、止まる。この状態で、write すれ ば、read/take が終了する。
次の2つのプログラムを作成する。
(1) 初期化プログラム
String url = argv[0];
tuple = [url,0]
space.write( tuple );
(2) アクセスされた時に動作するプログラム
値を増やす。現在の値を画面に表示する。