並行システム システム情報系情報工学域, システム情報工学研究科コンピュータサイエンス専攻 新城 靖 <yas@cs.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2013/2013-11-26
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/cs/
http://www.cs.tsukuba.ac.jp/~yas/
参考文献 References
Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).
Per Brinch Hansen: "The programming language Concurrent Pascal," IEEE Transactions on Software Engineering, Vol.SE-1, No.2, pp.199-207, June 1975.
type procA_t = process(args...); var local variables... procedure proc1(args...); procedure proc2(args...); begin cycle ... end; end var procA1 : procA_t ; init procA1(args...);
type monA_t = monitor(args ・・・); var loal variables procedure entry proc1(args ・・・); procedure entry proc2(args ・・・); begin initialization of local variables; end var monA1 : monA_t ; init monA1(引数);ローカル変数は、entry の手続きでしかアクセスできない。 1 つのプロセスが entry の手続きを実行中は、他のプロセスは入れない(mutual exclusion)。
図? Concurrent Pascalのプロセスとモニタ/Processes and a monitor in Concurrent Pascal
cv1 : condition; cv1.wait; 呼び出したプロセスを待たせる。block the current process. cv1.signal; 待っているプロセスがいれば全て再開させる。unblock all waiting processes.
q1 : queue; delay(q1); 呼び出したプロセスをそのキューで待たせる。 block the current process. continue(q1); そのキューで待っているプロセスがいれば1つだけ再開させる。 unblock a single process in the queue.
$ producer | consumer
2つのスレッドの間には、バッファを置く。
図? 有限バッファ(環状バッファ)、生産者、消費者 / bounded buffer (circular buffer), producer, consumer
バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。
手続き procedures
1: const BUFFER_SIZE = 4; 2: type circular_buffer = 3: monitor 4: var 5: rp : integer ; 6: wp : integer ; 7: data: array [0..BUFFER_SIZE-1] of integer; 8: used: integer; 9: not_empty : condition; 10: not_full : condition; 11: 12: procedure entry put(x:integer); 13: begin 14: while( used = BUFFER_SIZE ) do 15: non_full.wait; 16: data[wp] := x; 17: wp := wp + 1 ; 18: if( wp >= BUFFER_SIZE ) 19: wp := 0 ; 20: used := used + 1 ; 21: not_empty.signal; 22: end 23: 24: procedure entry get(result x:integer); 25: begin 26: while( used = 0 ) then 27: not_empty.wait; 28: x := data[rp]; 29: rp := rp + 1 ; 30: if( rp >= BUFFER_SIZE ) 31: rp := 0 ; 32: used := used - 1 ; 33: not_full.signal; 34: end 35: begin 36: rp := 0 ; 37: wp := 0 ; 38: used := 0 ; 39: end; 40: 41: ... 42: var buf : circular_buffer ; 43: init buf; 44: ... 45:
http://portal.acm.org/citation.cfm?doid=359576.359585
Process A:
... B ? x; -- プロセス B からの通信を待つ。 Wait for a message from Process B. -- メッセージを受け取り、変数 x へ保存。 Receive a message and store it into the variable x. B ! x*2; -- プロセス B へ値 x * 2 を送る。Send the value x * 2 to Process B. ...
Process B:
... y := 5; A ! y + 1; -- プロセス A へ値 6 を送る。Send the value 6 to Process A. ... A ? y; -- プロセス A からの通信を待つ。Wait for a message from Process A. -- メッセージを受け取り、変数 y へ保存。Receive a message and store it into the variable y.
プロセスを指定する名前付けでは、ライブラリが作れない。マクロでごまかす。
名前::...
||
□
(正方形ではなく、縦長に書くのが正確。)
Merge:: c:character; *[ X ? c → Sink ! c □ Y ? c → Sink ! c □ Z ? c → Sink ! c ]
1: [ Buffer:: 2: buf(0..bufsize-1): buffer_element; 3: first, last : integer; 4: j,k : integer; 5: first :=0; 6: last :=0; 7: * [ (j: 1..numprod) 8: (last + 1) mod bufsize ≠ first; 9: Producer(j) ? buf(last) → 10: last := (last +1) mod bufsize 11: □ 12: (k: 1..numcons) 13: first ≠ last; 14: Consumer(k) ? more() → 15: Consumer(k) ! buf(first); 16: first := (first + 1) mod bufsize 17: ] 18: || (i:1..numprod) PRODUCER -- 生産者のプログラム 19: || (i:1..numcons) CONSUMER -- 消費者のプログラム 20: ]
仕様の例。1行のバッファ。
task line_block is entry add( c: in character); end line_block;in は、入力パラメタの意味。タスク側から見て入力(受信)。
受け側の例
accept add( c: in character) do thisline(i) := c; end add; // i := i + 1;
呼出し側の例
line_block.add("d");エントリを呼び出すと、呼び出したタスクは、ブロックされる。 受け入れ側で、accept ... end が終わると、ブロックが解除される。
図? 対称的なランデブ/ Symmetric rendezvous
図? Adaのランデブ/ Rendezvous in Ada
select when Cond => accept ... do ... end; or when Cond => accept ... do ... end; or when Cond => delay seconds; end
when Cond =>
」は、書かなくてもよい。
1: type line is array (1..120) of character; 2: 3: task line_block is 4: entry add( c: in character); 5: entry plese_print( ln: out line); 6: end line_block; 7: 8: task body line_block is 9: printer_trouble_time : constant integer := 300; 10: print_line, fill_line : line; 11: nextfree : integer; 12: print_ready : boolean; 13: begin 14: nextfree := 1; 15: print_ready := false; 16: loop 17: select 18: when (nextfree < 121 ) => 19: accept add( c: in character) do 20: fill_line(nextfree) := c; 21: end add; 22: nextfree := nextfree + 1; 23: if( next_free = 121 ) and not print_ready then 24: print_ready := true; 25: nextfree := 1; 26: print_line := fill_line; 27: end if; 28: or 29: when print_ready => 30: accept please_print( ln: out line ) do 31: ln := print_line; 32: end please_print; 33: if nextfree = 121 then 34: print_line := fill_line; 35: nextfree := 1; 36: else 37: print_ready := false; 38: end if; 39: or 40: when print_ready and (nextfree > 120) => 41: delay printr_trouble_time; 42: printer_trouble; 43: end select; 44: end loop; 45: end line_block;
プログラミング言語は、ネットワークの存在や通信をさまざまな度合いで隠している。 A programming language hides underlying networks and communication in a degree.
問題 Question:
ネットワークの機能はある。has a networking facility.
利用者は、常にどの計算機を使っているのかを意識する必要がある。 A user is aware of which computer in a network he/she is using.
コマンド commands
分散型オペレーティング・システム a distributed operating system
利用者は、仮想的な1台の計算機を使っているように感じ、計算機と計算機の 境界線が見えなくなる。分散透明性(network transparency)が実現されてい る。 A user thinks that he/she is using a virtual centralized computer. A user cannot see the network boundaries. Network transparency is realized.
目標 goals
注意: 負荷分散の分散(load sharing/balancing)とこの講義のタイトルの分散 (distributed)は意味が違う。
A note on Distributed Computing [Waldo 1994]
例:
分散の難しい問題は、まだ汎用的に解ける技術はない。 汎用性がないと、プログラミング言語には採り入れにくい。
一般的な特徴
Emerald の目標。
メソッド呼出しの引数も参照で渡される。オブジェクトの場所が違うと、重た い。コピーの方が速い。
Call by reference が重たい。
http://www.recursionsw.com/products/voyager/voyager-intro.html
import com.objectspace.voyager.*; public interface IBall { public void hit(); }
import com.objectspace.voyager.*; public class Ball implements IBall { public void hit() { System.out.println("Ball has been hit"); } }
import com.objectspace.voyager.*; public class BallMachine { public static void main(String[] args) { try { Voyager.startup("8000"); // als Server starten Ball ball = new Ball(); Namespace.bind("EinBall",ball); } catch( Exception exception ) { System.err.println( exception ); } } }
import com.objectspace.voyager.*; public class Bat { public void play(IBall ball) { System.out.println("Hitting the new Ball"); ball.hit(); } public static void main(String[] args) { try { Voyager.startup(); // als Client starten Bat bat = new Bat(); IBall ball = (IBall) Namespace.lookup("//vsyspc5.informatik.uni-hamburg.de:8000/EinBall"); bat.play(ball); } catch( Exception exception ) { System.err.println( exception ); } Voyager.shutdown(); } }
IMobility mobileObj = Momility.of(obj); mobileObj.moveTo("url");
import com.objectspace.voyager.*; import com.objectspace.voyager.mobility.*; public class Bat { public void play(IBall ball, String url) { try { ball.hit(); System.out.println("Ball bewegen?"); Mobility.of(ball).moveTo(url); System.out.println("Ball bewegt"); } catch (MobilityException e) { System.out.println(e); e.printStackTrace(); } } public static void main(String[] args) { try { Voyager.startup("9001"); ClassManager.enableResourceServer(); Bat bat = new Bat(); //Ball newball= new Ball(); IBall ball = (IBall) Proxy.of(new Ball()); bat.play(ball,"//vsyspc5:8000"); System.out.println("Ball 1ste mal gespielt"); bat.play(ball,"//localhost:9001"); } catch( Exception exception ) { System.err.println( exception ); } Voyager.shutdown(); } }
IA a1 = (IA) Factory.create("A","//sun:8000"); a1.method(param1,param2); // 同期 synchronous Result r1 = Future.invoke(object, "method", // 非同期 asynchronous new Object [] {param1,param2}); ... if( result.isAvailable() ) { int x = r1.readInt(); }メソッド名を文字列で渡す。結果として、Result 型を返す。isAvailable() メソッドで終了を待つ。readInt(), readByte(), readObject() で、int, byte, オブジェクトに戻す。
ISubspace subspace = (ISubspace) Namespace.lookup("//sun:9000/Subspace"); A a1 = new A(); subspace.add(a1); A a2 = new A(); subspace.add(a2); a1.method1( param1, param2 ); a2.method1( param1, param2 ); Object [] params = new Object [] { param1, param2 }; Multicast.invoke(subspace,"method1",params,"A");
public class Hello implements java.io.Serializable { public void sayHello(java.lang.String name) { System.out.println("Hello " + name); } }djayc コンパイラで、2つの Java のクラスが作られる。 The djayc compiler generates two Java class files.
import dejay.base.*; public class HelloStartup { public static void main(String args[]) { try { DjProcessor p1 = new DjProcessor("//localhost:8000"); DjHello hello1 = new DjHello(p1); hello1.sayHello("Thorsten"); p1.moveTo("//localhost:9000"); hello1.sayHello("Jan"); hello1.moveTo("//mac:9000"); } catch (ConstructProcessorFailedException e) { System.out.println("Creating Virtual Processor failed."+e); } } }
DjProcessor p1 = new DjProcessor("hostname:port");これにより、ホスト
hostname
上の
ポート番号 port
で新たな
仮想プロセッサが作られる。
ホスト hostname
では、事前に
Voyager デーモンを走らせておく必要がある。
class A
(.dj) から、自動的に
スタブ(プロキシ)となる
class DjA
が生成される。
コンストラクタの引数が1個多い。
DjA a1 = new DjA( Aの引数・・・, DjProcessor );これで、遠隔の仮想プロセッサでオブジェクトが生成され、 遠隔参照(remote reference)が返される。 通常の参照と同じになるように頑張っているが、 一部違う。
DjA a2 ; a2 = ... ; ... if( a2 == a1 ) // 遠隔のオブジェクトではなくてローカルのスタブの比較 { ... }equals() メソッドは、遠隔でも働く。
ローカル・オブジェクトを引数にして、 リモート・オブジェクトを呼ぶと、自動的に遠隔にコピーが渡される。 (遠隔オブジェクトが作られて、遠隔参照が渡されるのではない。)
DjX
型のオブジェクトを
遠隔にリターンすることはできる。
DjX
がみつからなければ、
X
のコピーが返される。
remote.getCopy() で、ローカルのコピーが得られる。
DjA a1 = new DjA(p1); A a2 = a1.getCopy();
DjA a1 = new DjA(p1); B b1 = a.method1( 10 ); // 同期 synchronous B b2 = a.method1( 10, Dejay.ASYNC ); // 非同期 asynchronous a.method1( 10, Dejay.ONEWAY ); // 一方向 one-way非同期には、完了待ちの方法が2種類ある。waiting for completion of asynchronous invocations
B b1 = a.method1( 10, Dejay.ASYNC ); b1.method2(); // wait by nesessity B b2 = a.method1( 10, Dejay.ASYNC ); if( b2.isAvailable() ) { b2.method2(); } else { // do something b2.waitForResult(); b2.method2(); }
2レベルの名前。
DjProcessor p1 = new DjProcessor("lin:8000"); p1.registerByName("p1"); DjA a1 = new DjA(p1); a1.registerByName("a1");
DjProcessor p1 = Dejay.getProcessorByName("p1"); DjA a1 = p1.getObjectByName("a1");
DjProcessor p1 = new DjProcessor("PersistProc","ProcesssorDB","lin:8000"); DjA a1 = new DjA(); if( p1.isPersistable ) { p1.persist(); p1.flush(); }persist() を呼ぶと、データベースに保存される。 flush() を呼ぶと、メモリが GC で回収可能になる。
persist() でデータベースに保存されたオブジェクトも、 使われる自動的にデータベースから回復される。
仮想プロセッサ全体がデータベースに保存できる。 次のようにして回復できる。
DjProcessor p1 = Dejay.getProcessorFromDB("PersistProc", "ProcesssorDB","lin:8000");
Choose one idea that is provided by both CSP and Ada. Describe this idea briefly.
Glue, which realizes XML Web services, and Java Remote Method Invocation (RMI) provide Remote Procedure Call (RPC). Choose two advantages of Voyager over Glue and Java RMI. Describe these advantages briefly.
Choose an advantage of Dejay over Voyager. Describe this advantage briefly.