並行システム
システム情報系情報工学域,
システム情報工学研究科コンピュータサイエンス専攻
新城 靖
<yas@cs.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2011/2012-02-09
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/cs/
http://www.cs.tsukuba.ac.jp/~yas/
参考文献
R.E. フィルマン, D.P. フリードマン著, 雨宮 真人, 尾内 理紀夫, 高橋 直 久 (訳): "協調型計算システム -- 分散型ソフトウェアの技法と道具立て", マグロウヒル (1986). ISBN-10: 4895010309.
Marko Boger: Java in Distributed Systems: Concurrency, Distribution and Persistence, John Wiley & Sons, 2001. ISBN: 0471498386
Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).
type procA_t = process(引数・・・);
var
局所データの宣言
procedure proc1(引数・・・);
procedure proc2(引数・・・);
begin
cycle
・・・
end;
end
var procA1 : procA_t ;
init procA1(引数);
type monA_t = monitor(引数・・・);
var
局所データの宣言
procedure entry proc1(引数・・・);
procedure entry proc2(引数・・・);
begin
ローカルデータの初期化;
end
var monA1 : monA_t ;
init monA1(引数);
cv1 : condition; cv1.wait; 呼び出したプロセスを待たせる。 cv1.signal; 待っているプロセスがいれば全て再開させる。
q1 : queue; delay(q1); 呼び出したプロセスをそのキューで待たせる。 continue(q1); そのキューで待っているプロセスがいれば1つだけ再開させる。
producer | consumer
2つのスレッドの間には、バッファを置く。
バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。
手続き
1: const BUFFER_SIZE = 4; 2: type circular_buffer = 3: monitor 4: var 5: rp : integer ; 6: wp : integer ; 7: data: array [0..BUFFER_SIZE-1] of integer; 8: used: integer; 9: not_empty : condition; 10: not_full : condition; 11: 12: procedure entry put(x:integer); 13: begin 14: while( used = BUFFER_SIZE ) do 15: non_full.wait; 16: data[wp] := x; 17: wp := wp + 1 ; 18: if( wp >= BUFFER_SIZE ) 19: wp := 0 ; 20: used := used + 1 ; 21: not_empty.signal; 22: end 23: 24: procedure entry get(result x:integer); 25: begin 26: while( used = 0 ) then 27: not_empty.wait; 28: x := data[rp]; 29: rp := rp + 1 ; 30: if( rp >= BUFFER_SIZE ) 31: rp := 0 ; 32: used := used - 1 ; 33: not_full.signal; 34: end 35: begin 36: rp := 0 ; 37: wp := 0 ; 38: used := 0 ; 39: end; 40: 41: ... 42: var buf : circular_buffer ; 43: init buf; 44: ... 45:
http://portal.acm.org/citation.cfm?doid=359576.359585
Process A:
...
B ? x; -- プロセス B からの通信を待つ。
-- メッセージを受け取り、変数 x へ保存。
B ! x*2; -- プロセス B へ値 x * 2 を送る。
...
Process B:
...
y := 5;
A ! y + 1; -- プロセス A へ値 5 を送る。
...
A ? y; -- プロセス A からの通信を待つ。
-- メッセージを受け取り、変数 y へ保存。
プロセスを指定する名前付けでは、ライブラリが作れない。マクロでごまかす。
名前::... ||
□ (正方形ではなく、縦長に書くのが正確。)
Merge:: c:character;
*[ X ? c → Sink ! c
□ Y ? c → Sink ! c
□ Z ? c → Sink ! c ]
1: [ Buffer:: 2: buf(0..bufsize-1): buffer_element; 3: first, last : integer; 4: j,k : integer; 5: first :=0; 6: last :=0; 7: * [ (j: 1..numprod) 8: (last + 1) mod bufsize ≠ first; 9: Producer(j) ? buf(last) → 10: last := (last +1) mod bufsize 11: □ 12: (k: 1..numcons) 13: first ≠ last; 14: Consumer(k) ? more() → 15: Consumer(k) ! buf(first); 16: first := (first + 1) mod bufsize 17: ] 18: || (i:1..numprod) PRODUCER -- 生産者のプログラム 19: || (i:1..numcons) CONSUMER -- 消費者のプログラム 20: ]
仕様の例。1行のバッファ。
task line_block is
entry add( c: in character);
end line_block;
in は、入力パラメタの意味。タスク側から見て入力(受信)。
受け側の例
accept add( c: in character) do
thisline(i) := c;
end add;
呼出し側の例
line_block.add("d");
エントリを呼び出すと、呼び出したタスクは、ブロックされる。
受け入れ側で、accept ... end が終わると、ブロックが解除される。
select
when 条件 =>
accept ... do ... end;
or
when 条件 =>
accept ... do ... end;
or
when 条件 =>
delay seconds;
end
when 条件 =>」は、書かなくてもよい。
1: type line is array (1..120) of character; 2: 3: task line_block is 4: entry add( c: in character); 5: entry plese_print( ln: out line); 6: end line_block; 7: 8: task body line_block is 9: printer_trouble_time : constant integer := 300; 10: print_line, fill_line : line; 11: nextfree : integer; 12: print_ready : boolean; 13: begin 14: nextfree := 1; 15: print_ready := false; 16: loop 17: select 18: when (nextfree < 121 ) => 19: accept add( c: in character) do 20: fill_line(nextfree) := c; 21: end add; 22: nextfree := nextfree + 1; 23: if( next_free = 121 ) and not print_ready then 24: print_ready := true; 25: nextfree := 1; 26: print_line := fill_line; 27: end if; 28: or 29: when print_ready => 30: accept please_print( ln: out line ) do 31: ln := print_line; 32: end please_print; 33: if nextfree = 121 then 34: print_line := fill_line; 35: nextfree := 1; 36: else 37: print_ready := false; 38: end if; 39: or 40: when print_ready and (nextfree > 120) => 41: delay printr_trouble_time; 42: printer_trouble; 43: end select; 44: end loop; 45: end line_block;
プログラミング言語は、ネットワークの存在や通信をさまざまな度合いで隠し ている。
問題:
ネットワークの機能はある。
利用者は、常にどの計算機を使っているのかを意識する必要がある。
コマンド
分散型オペレーティング・システム
利用者は、仮想的な1台の計算機を使っているように感じ、計算機と計算機の 境界線が見えなくなる。分散透明性(network transparency)が実現されてい る。
目標
注意: 負荷分散の分散(load sharing/balancing)とこの講義のタイトルの分散 (distributed)は意味が違う。
A note on Distributed Computing [Waldo 1994]
例:
分散の難しい問題は、まだ汎用的に解ける技術はない。 汎用性がないと、プログラミング言語には採り入れにくい。
一般的な特徴
Emerald の目標。
メソッド呼出しの引数も参照で渡される。オブジェクトの場所が違うと、重た い。コピーの方が速い。
Call by reference が重たい。
http://www.recursionsw.com/products/voyager/voyager-intro.html
import com.objectspace.voyager.*;
public class BallMachine {
public static void main(String[] args) {
try {
Voyager.startup("8000"); // als Server starten
Ball ball = new Ball();
Namespace.bind("EinBall",ball);
} catch( Exception exception ) {
System.err.println( exception );
}
}
}
import com.objectspace.voyager.*;
public class Bat {
public void play(IBall ball) {
System.out.println("Hitting the new Ball");
ball.hit();
}
public static void main(String[] args) {
try {
Voyager.startup(); // als Client starten
Bat bat = new Bat();
IBall ball = (IBall) Namespace.lookup("//vsyspc5.informatik.uni-hamburg.de:8000/EinBall");
bat.play(ball);
} catch( Exception exception ) {
System.err.println( exception );
}
Voyager.shutdown();
}
}
IMobility mobileObj = Momility.of(obj);
mobileObj.moveTo("url");
import com.objectspace.voyager.*;
import com.objectspace.voyager.mobility.*;
public class Bat {
public void play(IBall ball, String url) {
try {
ball.hit();
System.out.println("Ball bewegen?");
Mobility.of(ball).moveTo(url);
System.out.println("Ball bewegt");
} catch (MobilityException e) {
System.out.println(e);
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
Voyager.startup("9001");
ClassManager.enableResourceServer();
Bat bat = new Bat();
//Ball newball= new Ball();
IBall ball = (IBall) Proxy.of(new Ball());
bat.play(ball,"//vsyspc5:8000");
System.out.println("Ball 1ste mal gespielt");
bat.play(ball,"//localhost:9001");
} catch( Exception exception ) {
System.err.println( exception );
}
Voyager.shutdown();
}
}
IA a1 = (IA) Factory.create("A","//sun:8000");
a1.method(param1,param2); // 同期
Result r1 = Future.invoke(object, "method", // 非同期
new Object [] {param1,param2});
...
if( result.isAvailable() )
{
int x = r1.readInt();
}
メソッド名を文字列で渡す。結果として、Result 型を返す。isAvailable()
メソッドで終了を待つ。readInt(), readByte(), readObject() で、int,
byte, オブジェクトに戻す。
ISubspace subspace = (ISubspace) Namespace.lookup("//sun:9000/Subspace");
A a1 = new A();
subspace.add(a1);
A a2 = new A();
subspace.add(a2);
a1.method1( param1, param2 );
a2.method1( param1, param2 );
Object [] params = new Object [] { param1, param2 };
Multicast.invoke(subspace,"method1",params,"A");
public class Hello implements java.io.Serializable {
public void sayHello(java.lang.String name) {
System.out.println("Hello " + name);
}
}
djeyc コンパイラで、2つの Java のクラスが作られる。
import dejay.base.*;
public class HelloStartup {
public static void main(String args[])
{
try {
DjProcessor p1 = new DjProcessor("//localhost:8000");
DjHello hello1 = new DjHello(p1);
hello1.sayHello("Thorsten");
p1.moveTo("//localhost:9000");
hello1.sayHello("Jan");
hello1.moveTo("//mac:9000");
} catch (ConstructProcessorFailedException e) {
System.out.println("Creating Virtual Processor failed."+e);
}
}
}
DjProcessor p1 = new DjProcessor("hostname:port");
これにより、ホスト hostname 上の
ポート番号 port で新たな
仮想プロセッサが作られる。
ホスト hostname では、事前に
Voyager デーモンを走らせておく必要がある。
class A (.dj) から、自動的に
スタブ(プロキシ)となる
class DjA が生成される。
コンストラクタの引数が1個多い。
DjA a1 = new DjA( Aの引数・・・, DjProcessor );
これで、遠隔の仮想プロセッサでオブジェクトが生成され、
遠隔参照(remote reference)が返される。
通常の参照と同じになるように頑張っているが、
一部違う。
DjA a2 ;
a2 = ... ;
...
if( a2 == a1 ) // 遠隔のオブジェクトではなくてローカルのスタブの比較
{
...
}
equals() メソッドは、遠隔でも働く。
ローカル・オブジェクトを引数にして、 リモート・オブジェクトを呼ぶと、自動的に遠隔にコピーが渡される。 (遠隔オブジェクトが作られて、遠隔参照が渡されるのではない。)
DjX 型のオブジェクトを
遠隔にリターンすることはできる。
DjX がみつからなければ、
Xのコピーが返される。
remote.getCopy() で、ローカルのコピーが得られる。
DjA a1 = new DjA(p1);
A a2 = a1.getCopy();
DjA a1 = new DjA(p1);
B b1 = a.method1( 10 ); // 同期
B b2 = a.method1( 10, Dejey.ASYNC ); // 非同期
a.method1( 10, Dejay.ONEWAY ); // 一方向
非同期には、完了待ちの方法が2種類ある
B b1 = a.method1( 10, Dejey.ASYNC );
b1.method2(); // wait by nesessity
B b2 = a.method1( 10, Dejey.ASYNC );
if( b2.isAvailable() )
{
b2.method2();
}
else
{
// do something
b2.waitForResult();
b2.method2();
}
2レベルの名前。
DjProcessor p1 = new DjProcessor("lin:8000");
p1.registerByName("p1");
DjA a1 = new DjA(p1);
a1.registerByName("a1");
DjProcessor p1 = Dejay.getProcessorByName("p1");
DjA a1 = p1.getObjectByName("a1");
DjProcessor p1 = new DjProcessor("PersistProc","ProcesssorDB","lin:8000");
DjA a1 = new DjA();
if( p1.isPersistable ) {
p1.persist();
p1.flush();
}
persist() を呼ぶと、データベースに保存される。
flush() を呼ぶと、メモリが GC で回収可能になる。
persist() でデータベースに保存されたオブジェクトも、 使われる自動的にデータベースから回復される。
仮想プロセッサ全体がデータベースに保存できる。 次のようにして回復できる。
DjProcessor p1 = Dejay.getProcessorFromDB("PersistProc",
"ProcesssorDB","lin:8000");