並行システム
                               システム情報工学研究科コンピュータサイエンス専攻、電子・情報工学系
                               新城 靖
                               <yas@is.tsukuba.ac.jp>
このページは、次の URL にあります。
	http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-15
あるいは、次のページから手繰っていくこともできます。
	http://www.cs.tsukuba.ac.jp/~yas/sie/
	http://www.cs.tsukuba.ac.jp/~yas/
参考文献
R.E. フィルマン, D.P. フリードマン著, 雨宮 真人, 尾内 理紀夫, 高橋 直 久 (訳): "協調型計算システム -- 分散型ソフトウェアの技法と道具立て", マグロウヒル (1986). ISBN-10: 4895010309.
Marko Boger: Java in Distributed Systems: Concurrency, Distribution and Persistence, John Wiley & Sons, 2001. ISBN: 0471498386
http://www.dpunkt.de/buch/3-932588-32-0.html (ドイツ語)
jivs code
Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).
type procA_t = process(引数・・・);
   var
      局所データの宣言
   procedure proc1(引数・・・);
   procedure proc2(引数・・・);
begin
   cycle
       ・・・
   end;
end
var procA1 : procA_t ;
init procA1(引数);
type monA_t = monitor(引数・・・);
   var
      局所データの宣言
   procedure entry proc1(引数・・・);
   procedure entry proc2(引数・・・);
begin
   ローカルデータの初期化;
end
var monA1 : monA_t ;
init monA1(引数);;
cv1 : condition; cv1.wait; 呼び出したプロセスを待たせる。 cv1.signal; 待っているプロセスがいれば全て再開させる。
q1 : queue; delay(q1); 呼び出したプロセスをそのキューで待たせる。 continue(q1); そのキューで待っているプロセスがいれば1つだけ再開させる。
producer | consumer2つのスレッドの間には、バッファを置く。
バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。
手続き
1: const BUFFER_SIZE = 4; 2: type circular_buffer = 3: monitor 4: var 5: rp : integer ; 6: rp : integer ; 7: data: array [0..BUFFER_SIZE-1] of integer; 8: used: integer; 9: not_empty : condition; 10: not_full : condition; 11: 12: procedure entry put(x:integer); 13: begin 14: while( used = BUFFER_SIZE ) do 15: non_full.wait; 16: data[wp] := x; 17: wp := wp + 1 ; 18: if( wp >= BUFFER_SIZE ) 19: wp := 0 ; 20: used := used + 1 ; 21: not_empty.signal; 22: end 23: 24: procedure entry get(result x:integer); 25: begin 26: while( used = 0 ) then 27: not_empty.wait; 28: x := data[rp]; 29: rp := rp + 1 ; 30: if( rp >= BUFFER_SIZE ) 31: rp := 0 ; 32: used := used - 1 ; 33: not_full.signal; 34: end 35: begin 36: rp := 0 ; 37: wp := 0 ; 38: used := 0 ; 39: end; 40: 41: ... 42: var buf : circular_buffer ; 43: init buf; 44: ... 45:
http://portal.acm.org/citation.cfm?doid=359576.359585
Process A:
...
B ? x;   -- プロセス B からの通信を待つ。
         -- メッセージを受け取り、変数 x へ保存。
B ! x*2; -- プロセス B へ値 x * 2 を送る。
...
Process B:
...
y := 5;
A ! y + 1; -- プロセス A へ値 5 を送る。
...
A ? y;     -- プロセス A からの通信を待つ。
           -- メッセージを受け取り、変数 y へ保存。
プロセスを指定する名前付けでは、ライブラリが作れない。マクロでごまかす。
名前::...  ||
□ (正方形ではなく、縦長に書くのが正確。)
Merge:: c:character;
        *[    X ? c → Sink ! c
           □ Y ? c → Sink ! c
           □ Z ? c → Sink ! c ]
[ Buffer::
    buf(0..bufsize-1): buffer_element;
    first, last : integer;
    j,k         : integer;
    first :=0;
    last :=0;
    * [ (j: 1..numbprod)
        (last + 1) mod bufsize ≠ first;
	Producer(j) ? buf(last) →
	    last := (last +1) mod bufsize
        □
        (k: 1..numcons)
        first ≠ last;
        Consumer(k) ? more() →
	    Consumer(k) ! buf(first);
	    first := (first + 1) mod bufsize
      ]
|| (i:1..numprod) PRODUCER -- 生産者のプログラム
|| (i:1..numcons) CONSUMER -- 消費者のプログラム
]
仕様の例。1行のバッファ。
task line_block is
    entry add( c: in character);
end  line_block;
in は、入力パラメタの意味。タスク側から見て入力(受信)。
受け側の例
accept add( c: in character) do
    thisline(i) := c;
end add;
呼出し側の例
    line_block.add("d");
エントリを呼び出すと、呼び出したタスクは、ブロックされる。
受け入れ側で、accept ... end が終わると、ブロックが解除される。
select
    when  条件 =>
        accept ... do ... end;
or
    when  条件 =>
        accept ... do ... end;
or
    when  条件 =>
        delay seconds;
end
when  条件 =>」は、書かなくてもよい。
type line is array (1..120) of character;
task line_block is
    entry add( c: in character);
    entry plese_print( ln: out line);
end  line_block;
task body line_block is
    printer_trouble_time  : constant integer := 300;
    print_line, fill_line : line;
    nextfree              : integer;
    print_ready           : boolean;
begin
    nextfree    := 1;
    print_ready := false;
    loop
        select
	    when (nextfree < 121 ) =>
	        accept add( c: in character) do
		    fill_line(nextfree) := c;
		end add;
		nextfree := nextfree + 1;
		if( next_free = 121 ) and not print_ready then
		    print_ready := true;
		    nextfree    := 1;
		    print_line	:= fill_line;
		end if;
	or
	    when print_ready =>
	        accept please_print( ln: out line ) do
		    ln := print_line;
		end please_print;
		if nextfree = 121 then
		    print_line := fill_line;
		    nextfree   := 1;
		else
		    print_ready := false;
		end if;
        or
	    when print_ready and (nextfree > 120) =>
	        delay printr_trouble_time;
		    printer_trouble;
        end select;
    end loop;
end line_block;
Barbara Liskov: "Distributed programming in Argus", Communications of the ACM, Vol.31, No.3, pp.300-312, 1988.
branch = guardian is create handles total, open, close, deposit, withdraw
  % type definitions
  htable = atomic_array[bucket]
  bucket = atomic_array[pair]
  pair = atomic_record[num: account_number, acct: acct_info]
  wet_info = atomic_record[bal: int]
  account_number = atomic_record[code: string, num: int]
  intcell = atomic_record[val: int]
  stable ht: htable    % the table of accounts
  stable code: string  % the code for the branch
  stable seed: intcell % the seed for generating new account numbers
  create = creator (c: string, size: int) returns (branch)
    code := c
    seed.val := 0
    ht := htable$new()
    for i: int in int$from_to(1, size) do
      htable$addh(ht, bucket$new())
    end
    return (self)
  end create
  total = handler () returns (int)
    sum: int := 0
    for b: bucket in htable$elements(ht) do
      for p: pair in bucket$elements(b) do
	sum := sum + p.acct.bal
      end
    end
    return (sum)
  end total
  open = handler () returns (account_number)
    intcell$write_lock(seed) % get a write lock on the seed
    a: account_number := account_ number${code: code, num: seed.val}
    seed.val := seed.val + 1
    bucket$addh(ht[hash(a.num)], pair${num: a, acct: acct_info${bal: 0}})
    return (a)
  end open
  close = handler (a: account_number) signals (no_such_acct, positive_balance)
    b: bucket := ht[hash(a.num)]
    for i: int in bucket$indexes(b) do
      if b[i].num != a then continue end
      if b[i].acct.bal > 0 then signal positive-balance end
      b[i] := bucket$top(b) % store topmost element in place of closed account
      bucket$remh(b) % discard topmost element
      return
    end
    signal no_such_acct
  end close
  lookup = proc (a: account_number) returns (acct_info) signals (no_such__acct)
    for p: pair in bucket$elements(ht[hash(a.num)]) do
      if p.num = a then return (pacct) end
    end
    signal no_such_acct
    end lookup
  deposit = handler (a: account_number, amt: int) signals (no_such_acct, negativ_amount)
    if amt < 0 then signal negative_amount end
    ainfo: acct_info := lookup(a) resignal no_such_acct
    ainfo.bal := ainfo.bal + amt
  end deposit
  withdraw = handler (a: account_number, amt: int)
    signals (no_such_acct, negative_amount, insufficient_funds)
    if amt < 0 then signal negative amount end
    ainfo: acct_info := lookup(a) resignal no_such_acct
    if ainfo.bal < amt then signal insufficient_funds end
    ainfo.bal := ainfo.bal - amt
  end withdraw
end branch
プログラミング言語は、ネットワークの存在や通信をさまざまな度合いで隠し ている。
問題:
ネットワークの機能はある。
利用者は、常にどの計算機を使っているのかを意識する必要がある。
コマンド
分散型オペレーティング・システム
利用者は、仮想的な1台の計算機を使っているように感じ、計算機と計算機の 境界線が見えなくなる。分散透明性(network transparency)が実現されてい る。
目標
注意: 負荷分散の分散(load sharing/balancing)とこの講義のタイトルの分散 (distributed)は意味が違う。
A note on Distributed Computing [Waldo 1994]
例:
分散の難しい問題は、まだ汎用的に解ける技術はない。 汎用性がないと、プログラミング言語には採り入れにくい。
一般的な特徴
Emerald の目標。
メソッド呼出しの引数も参照で渡される。オブジェクトの場所が違うと、重た い。コピーの方が速い。
Call by reference が重たい。
import com.objectspace.voyager.*;
public class BallMachine  {
    public static void main(String[] args) {
		try {
			Voyager.startup("8000"); // als Server starten
			Ball ball = new Ball();
			Namespace.bind("EinBall",ball);
		} catch( Exception exception ) {
			System.err.println( exception );
		}
    }
}
import com.objectspace.voyager.*;
public class Bat {
    public void play(IBall ball) {
		System.out.println("Hitting the new Ball");
		ball.hit();
    }
    public static void main(String[] args) {
		try {
		    Voyager.startup(); // als Client starten
			Bat bat = new Bat();
			IBall ball = (IBall) Namespace.lookup("//vsyspc5.informatik.uni-hamburg.de:8000/EinBall");
			bat.play(ball);
		} catch( Exception exception ) {
			System.err.println( exception );
		}
		Voyager.shutdown();
    }  
}
    IMobility mobileObj = Momility.of(obj);
    mobileObj.moveTo("url");
import com.objectspace.voyager.*;
import com.objectspace.voyager.mobility.*;
public class Bat {
   public void play(IBall ball, String url) {
	  try {
		 ball.hit();
		 System.out.println("Ball bewegen?");
		 Mobility.of(ball).moveTo(url);
		 System.out.println("Ball bewegt");
	  } catch (MobilityException e) {
		 System.out.println(e);
		 e.printStackTrace();
	  }
   }
   public static void main(String[] args) {
      try {
		 Voyager.startup("9001"); 
		 ClassManager.enableResourceServer();
		 Bat bat = new Bat();
		 //Ball newball= new Ball();
		 IBall ball = (IBall) Proxy.of(new Ball());
		 bat.play(ball,"//vsyspc5:8000");
		 System.out.println("Ball 1ste mal gespielt");
		 bat.play(ball,"//localhost:9001");
      } catch( Exception exception ) {
	    System.err.println( exception );
      }
      Voyager.shutdown();
   }
}
    IA a1 = (IA) Factory.create("A","//sun:8000");
    a1.method(param1,param2);  // 同期 
    Result r1 = Future.invoke(object, "method", // 非同期
        new Object [] {param1,param2});
...
    if( result.isAvailable() )
    {
       int x = r1.readInt();
    }
メソッド名を文字列で渡す。結果として、Result 型を返す。isAvailable() 
メソッドで終了を待つ。readInt(), readByte(), readObject() で、int,
byte, オブジェクトに戻す。
    ISubspace subspace = (ISubspace) Namespace.lookup("//sun:9000/Subspace");
    A a1 = new A();
    subspace.add(a1);
    a1.method1( param1, param2 );
    Object [] params = new Object [] { param1, param2 };
    Multicast.invoke(subspace,"method1",params,"A");
    DjProcessor p1 = new DjProcessor("sun:8000");
    DjHello hello = new DjHello(p1);
    hello.moveTo("mac:9000");
    DjProcessor p1 = new DjProcessor("hostname:port");
これにより、ホスト hostname 上の
ポート番号  port で新たな
仮想プロセッサが作られる。
ホスト hostname では、事前に
Voyager デーモンを走らせておく必要がある。
class A (.dj) から、自動的に
スタブ(プロキシ)となる
class DjA が生成される。
コンストラクタの引数が1個多い。
    DjA a1 = new DjA( Aの引数・・・, DjProcessor );
これで、遠隔の仮想プロセッサでオブジェクトが生成され、
遠隔参照(remote reference)が返される。
通常の参照と同じになるように頑張っているが、
一部違う。
    DjA a2 ;
    a2 = ... ;
    ...
    if( a2 == a1 ) // 遠隔のオブジェクトではなくてローカルのスタブの比較
    {
       ...
    }
equals() メソッドは、遠隔でも働く。
ローカル・オブジェクトを引数にして、 リモート・オブジェクトを呼ぶと、自動的に遠隔にコピーが渡される。 (遠隔オブジェクトが作られて、遠隔参照が渡されるのではない。)
DjX 型のオブジェクトを
遠隔にリターンすることはできる。
DjX がみつからなければ、
Xのコピーが返される。
remote.getCopy() で、ローカルのコピーが得られる。
    DjA a1 = new DjA(p1);
    A a2 = a1.getCopy();
3つのメソッド呼出しのセマンティクスがある。
    DjA a1 = new DjA(p1);
    B b1 = a.method1( 10 );		 // 同期
    B b2 = a.method1( 10, Dejey.ASYNC ); // 非同期
    a.method1( 10, Dejay.ONEWAY );       // 一方向
非同期には、完了待ちの方法が2種類ある
    B b1 = a.method1( 10, Dejey.ASYNC );
    b1.method2(); // wait by nesessity
    B b2 = a.method1( 10, Dejey.ASYNC );
    if( b2.isAvailable() )
    {
        b2.method2();
    }
    else
    {
        // do something
        b2.waitForResult();
        b2.method2();
    }
2レベルの名前。
    DjProcessor p1 = new DjProcessor("lin:8000");
    p1.registerByName("p1");
    DjA a1 = new DjA(p1);
    a1.registerByName("a1");
    DjProcessor p1 = Dejay.getProcessorByName("p1");
    DjA a1 = p1.getObjectByName("a1");
    DjProcessor p1 = new DjProcessor("PersistProc","ProcesssorDB","lin:8000");
    DjA a1 = new DjA();
    if( p1.isPersistable ) {
       p1.persist();
       p1.flush();
    }
persist() を呼ぶと、データベースに保存される。
flush() を呼ぶと、メモリが GC で回収可能になる。
persist() でデータベースに保存されたオブジェクトも、 使われる自動的にデータベースから回復される。
仮想プロセッサ全体がデータベースに保存できる。 次のようにして回復できる。
    DjProcessor p1 = Dejay.getProcessorFromDB("PersistProc",
        "ProcesssorDB","lin:8000");