並行システム システム情報工学研究科コンピュータサイエンス専攻、電子・情報工学系 新城 靖 <yas@is.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2009/2010-02-05
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/sie/
http://www.cs.tsukuba.ac.jp/~yas/
参考文献
R.E. フィルマン, D.P. フリードマン著, 雨宮 真人, 尾内 理紀夫, 高橋 直 久 (訳): "協調型計算システム -- 分散型ソフトウェアの技法と道具立て", マグロウヒル (1986). ISBN-10: 4895010309.
Marko Boger: Java in Distributed Systems: Concurrency, Distribution and Persistence, John Wiley & Sons, 2001. ISBN: 0471498386
逐次型プログラミング言語 Pascal を拡張したもの。
Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).
type procA_t = process(引数・・・); var 局所データの宣言 procedure proc1(引数・・・); procedure proc2(引数・・・); begin cycle ・・・ end; end var procA1 : procA_t ; init procA1(引数);
type monA_t = monitor(引数・・・); var 局所データの宣言 procedure entry proc1(引数・・・); procedure entry proc2(引数・・・); begin ローカルデータの初期化; end var monA1 : monA_t ; init monA1(引数);;
cv1 : condition; cv1.wait; 呼び出したプロセスを待たせる。 cv1.signal; 待っているプロセスがいれば全て再開させる。
q1 : queue; delay(q1); 呼び出したプロセスをそのキューで待たせる。 continue(q1); そのキューで待っているプロセスがいれば1つだけ再開させる。
producer | consumer
2つのスレッドの間には、バッファを置く。
バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。
手続き
1: const BUFFER_SIZE = 4; 2: type circular_buffer = 3: monitor 4: var 5: rp : integer ; 6: rp : integer ; 7: data: array [0..BUFFER_SIZE-1] of integer; 8: used: integer; 9: not_empty : condition; 10: not_full : condition; 11: 12: procedure entry put(x:integer); 13: begin 14: while( used = BUFFER_SIZE ) do 15: non_full.wait; 16: data[wp] := x; 17: wp := wp + 1 ; 18: if( wp >= BUFFER_SIZE ) 19: wp := 0 ; 20: used := used + 1 ; 21: not_empty.signal; 22: end 23: 24: procedure entry get(result x:integer); 25: begin 26: while( used = 0 ) then 27: not_empty.wait; 28: x := data[rp]; 29: rp := rp + 1 ; 30: if( rp >= BUFFER_SIZE ) 31: rp := 0 ; 32: used := used - 1 ; 33: not_full.signal; 34: end 35: begin 36: rp := 0 ; 37: wp := 0 ; 38: used := 0 ; 39: end; 40: 41: ... 42: var buf : circular_buffer ; 43: init buf; 44: ... 45:
http://portal.acm.org/citation.cfm?doid=359576.359585
Process A:
... B ? x; -- プロセス B からの通信を待つ。 -- メッセージを受け取り、変数 x へ保存。 B ! x*2; -- プロセス B へ値 x * 2 を送る。 ...
Process B:
... y := 5; A ! y + 1; -- プロセス A へ値 5 を送る。 ... A ? y; -- プロセス A からの通信を待つ。 -- メッセージを受け取り、変数 y へ保存。
プロセスを指定する名前付けでは、ライブラリが作れない。マクロでごまかす。
名前::...
||
□
(正方形ではなく、縦長に書くのが正確。)
Merge:: c:character; *[ X ? c → Sink ! c □ Y ? c → Sink ! c □ Z ? c → Sink ! c ]
[ Buffer:: buf(0..bufsize-1): buffer_element; first, last : integer; j,k : integer; first :=0; last :=0; * [ (j: 1..numbprod) (last + 1) mod bufsize ≠ first; Producer(j) ? buf(last) → last := (last +1) mod bufsize □ (k: 1..numcons) first ≠ last; Consumer(k) ? more() → Consumer(k) ! buf(first); first := (first + 1) mod bufsize ] || (i:1..numprod) PRODUCER -- 生産者のプログラム || (i:1..numcons) CONSUMER -- 消費者のプログラム ]
仕様の例。1行のバッファ。
task line_block is entry add( c: in character); end line_block;in は、入力パラメタの意味。タスク側から見て入力(受信)。
受け側の例
accept add( c: in character) do thisline(i) := c; end add;
呼出し側の例
line_block.add("d");エントリを呼び出すと、呼び出したタスクは、ブロックされる。 受け入れ側で、accept ... end が終わると、ブロックが解除される。
select when 条件 => accept ... do ... end; or when 条件 => accept ... do ... end; or when 条件 => delay seconds; end
when 条件 =>
」は、書かなくてもよい。
type line is array (1..120) of character; task line_block is entry add( c: in character); entry plese_print( ln: out line); end line_block; task body line_block is printer_trouble_time : constant integer := 300; print_line, fill_line : line; nextfree : integer; print_ready : boolean; begin nextfree := 1; print_ready := false; loop select when (nextfree < 121 ) => accept add( c: in character) do fill_line(nextfree) := c; end add; nextfree := nextfree + 1; if( next_free = 121 ) and not print_ready then print_ready := true; nextfree := 1; print_line := fill_line; end if; or when print_ready => accept please_print( ln: out line ) do ln := print_line; end please_print; if nextfree = 121 then print_line := fill_line; nextfree := 1; else print_ready := false; end if; or when print_ready and (nextfree > 120) => delay printr_trouble_time; printer_trouble; end select; end loop; end line_block;
プログラミング言語は、ネットワークの存在や通信をさまざまな度合いで隠し ている。
問題:
ネットワークの機能はある。
利用者は、常にどの計算機を使っているのかを意識する必要がある。
コマンド
分散型オペレーティング・システム
利用者は、仮想的な1台の計算機を使っているように感じ、計算機と計算機の 境界線が見えなくなる。分散透明性(network transparency)が実現されてい る。
目標
注意: 負荷分散の分散(load sharing/balancing)とこの講義のタイトルの分散 (distributed)は意味が違う。
A note on Distributed Computing [Waldo 1994]
例:
分散の難しい問題は、まだ汎用的に解ける技術はない。 汎用性がないと、プログラミング言語には採り入れにくい。
一般的な特徴
Emerald の目標。
メソッド呼出しの引数も参照で渡される。オブジェクトの場所が違うと、重た い。コピーの方が速い。
Call by reference が重たい。
import com.objectspace.voyager.*; public class BallMachine { public static void main(String[] args) { try { Voyager.startup("8000"); // als Server starten Ball ball = new Ball(); Namespace.bind("EinBall",ball); } catch( Exception exception ) { System.err.println( exception ); } } }
import com.objectspace.voyager.*; public class Bat { public void play(IBall ball) { System.out.println("Hitting the new Ball"); ball.hit(); } public static void main(String[] args) { try { Voyager.startup(); // als Client starten Bat bat = new Bat(); IBall ball = (IBall) Namespace.lookup("//vsyspc5.informatik.uni-hamburg.de:8000/EinBall"); bat.play(ball); } catch( Exception exception ) { System.err.println( exception ); } Voyager.shutdown(); } }
IMobility mobileObj = Momility.of(obj); mobileObj.moveTo("url");
import com.objectspace.voyager.*; import com.objectspace.voyager.mobility.*; public class Bat { public void play(IBall ball, String url) { try { ball.hit(); System.out.println("Ball bewegen?"); Mobility.of(ball).moveTo(url); System.out.println("Ball bewegt"); } catch (MobilityException e) { System.out.println(e); e.printStackTrace(); } } public static void main(String[] args) { try { Voyager.startup("9001"); ClassManager.enableResourceServer(); Bat bat = new Bat(); //Ball newball= new Ball(); IBall ball = (IBall) Proxy.of(new Ball()); bat.play(ball,"//vsyspc5:8000"); System.out.println("Ball 1ste mal gespielt"); bat.play(ball,"//localhost:9001"); } catch( Exception exception ) { System.err.println( exception ); } Voyager.shutdown(); } }
IA a1 = (IA) Factory.create("A","//sun:8000"); a1.method(param1,param2); // 同期 Result r1 = Future.invoke(object, "method", // 非同期 new Object [] {param1,param2}); ... if( result.isAvailable() ) { int x = r1.readInt(); }メソッド名を文字列で渡す。結果として、Result 型を返す。isAvailable() メソッドで終了を待つ。readInt(), readByte(), readObject() で、int, byte, オブジェクトに戻す。
ISubspace subspace = (ISubspace) Namespace.lookup("//sun:9000/Subspace"); A a1 = new A(); subspace.add(a1); a1.method1( param1, param2 ); Object [] params = new Object [] { param1, param2 }; Multicast.invoke(subspace,"method1",params,"A");
public class Hello implements java.io.Serializable { public void sayHello(java.lang.String name) { System.out.println("Hello " + name); } }djeyc コンパイラで、2つの Java のクラスが作られる。
import dejay.base.*; public class HelloStartup { public static void main(String args[]) { try { DjProcessor p1 = new DjProcessor("//localhost:8000"); DjHello hello1 = new DjHello(p1); hello1.sayHello("Thorsten"); p1.moveTo("//localhost:9000"); hello1.sayHello("Jan"); hello1.moveTo("//mac:9000"); } catch (ConstructProcessorFailedException e) { System.out.println("Creating Virtual Processor failed."+e); } } }
DjProcessor p1 = new DjProcessor("hostname:port");これにより、ホスト
hostname
上の
ポート番号 port
で新たな
仮想プロセッサが作られる。
ホスト hostname
では、事前に
Voyager デーモンを走らせておく必要がある。
class A
(.dj) から、自動的に
スタブ(プロキシ)となる
class DjA
が生成される。
コンストラクタの引数が1個多い。
DjA a1 = new DjA( Aの引数・・・, DjProcessor );これで、遠隔の仮想プロセッサでオブジェクトが生成され、 遠隔参照(remote reference)が返される。 通常の参照と同じになるように頑張っているが、 一部違う。
DjA a2 ; a2 = ... ; ... if( a2 == a1 ) // 遠隔のオブジェクトではなくてローカルのスタブの比較 { ... }equals() メソッドは、遠隔でも働く。
ローカル・オブジェクトを引数にして、 リモート・オブジェクトを呼ぶと、自動的に遠隔にコピーが渡される。 (遠隔オブジェクトが作られて、遠隔参照が渡されるのではない。)
DjX
型のオブジェクトを
遠隔にリターンすることはできる。
DjX
がみつからなければ、
X
のコピーが返される。
remote.getCopy() で、ローカルのコピーが得られる。
DjA a1 = new DjA(p1); A a2 = a1.getCopy();
DjA a1 = new DjA(p1); B b1 = a.method1( 10 ); // 同期 B b2 = a.method1( 10, Dejey.ASYNC ); // 非同期 a.method1( 10, Dejay.ONEWAY ); // 一方向非同期には、完了待ちの方法が2種類ある
B b1 = a.method1( 10, Dejey.ASYNC ); b1.method2(); // wait by nesessity B b2 = a.method1( 10, Dejey.ASYNC ); if( b2.isAvailable() ) { b2.method2(); } else { // do something b2.waitForResult(); b2.method2(); }
2レベルの名前。
DjProcessor p1 = new DjProcessor("lin:8000"); p1.registerByName("p1"); DjA a1 = new DjA(p1); a1.registerByName("a1");
DjProcessor p1 = Dejay.getProcessorByName("p1"); DjA a1 = p1.getObjectByName("a1");
DjProcessor p1 = new DjProcessor("PersistProc","ProcesssorDB","lin:8000"); DjA a1 = new DjA(); if( p1.isPersistable ) { p1.persist(); p1.flush(); }persist() を呼ぶと、データベースに保存される。 flush() を呼ぶと、メモリが GC で回収可能になる。
persist() でデータベースに保存されたオブジェクトも、 使われる自動的にデータベースから回復される。
仮想プロセッサ全体がデータベースに保存できる。 次のようにして回復できる。
DjProcessor p1 = Dejay.getProcessorFromDB("PersistProc", "ProcesssorDB","lin:8000");