並行プログラミング言語

並行システム

                               システム情報工学研究科コンピュータサイエンス専攻、電子・情報工学系
                               新城 靖
                               <yas@is.tsukuba.ac.jp>

このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/sie/csys-2007/2008-02-15
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/sie/
http://www.cs.tsukuba.ac.jp/~yas/

■連絡事項

■今日の重要な話

参考文献

R.E. フィルマン, D.P. フリードマン著, 雨宮 真人, 尾内 理紀夫, 高橋 直 久 (訳): "協調型計算システム -- 分散型ソフトウェアの技法と道具立て", マグロウヒル (1986). ISBN-10: 4895010309.

Marko Boger: Java in Distributed Systems: Concurrency, Distribution and Persistence, John Wiley & Sons, 2001. ISBN: 0471498386

http://www.dpunkt.de/buch/3-932588-32-0.html (ドイツ語) jivs code

■Concurrent Pascal

2007年12月14日の再掲 逐次型プログラミング言語 Pascal を拡張したもの。

◆参考文献

Per Brinch Hansen: "The Architecture of Concurrent Program", Prentice Hall (1977).

Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).

◆プロセス

type procA_t = process(引数・・・);
   var
      局所データの宣言
   procedure proc1(引数・・・);
   procedure proc2(引数・・・);
begin
   cycle
       ・・・
   end;
end

var procA1 : procA_t ;
init procA1(引数);

◆モニタ

type monA_t = monitor(引数・・・);
   var
      局所データの宣言
   procedure entry proc1(引数・・・);
   procedure entry proc2(引数・・・);
begin
   ローカルデータの初期化;
end

var monA1 : monA_t ;
init monA1(引数);;

◆条件変数とキュー

  cv1 : condition;
  cv1.wait;      呼び出したプロセスを待たせる。
  cv1.signal;   待っているプロセスがいれば全て再開させる。
  q1 : queue;
  delay(q1);      呼び出したプロセスをそのキューで待たせる。
  continue(q1);   そのキューで待っているプロセスがいれば1つだけ再開させる。

◆有限バッファ

Unix のパイプのようなことを Concurrent Pascal のプロセス(スレッド)を使って実行したい。
producer | consumer
2つのスレッドの間には、バッファを置く。

図? 環状バッファ(有限バッファ)、生産者、消費者

図? 環状バッファ(有限バッファ)、生産者、消費者"

バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。

手続き

   1: const BUFFER_SIZE = 4;
   2: type circular_buffer =
   3: monitor
   4: var
   5:     rp : integer ;
   6:     rp : integer ;
   7:     data: array [0..BUFFER_SIZE-1] of integer;
   8:     used: integer;
   9:     not_empty : condition;
  10:     not_full  : condition;
  11: 
  12:     procedure entry put(x:integer);
  13:     begin
  14:         while( used = BUFFER_SIZE ) do
  15:             non_full.wait;
  16:         data[wp] := x;
  17:         wp := wp + 1 ;
  18:         if( wp >= BUFFER_SIZE )
  19:             wp := 0 ;
  20:         used := used + 1 ;
  21:         not_empty.signal;
  22:     end
  23: 
  24:     procedure entry get(result x:integer);
  25:     begin
  26:         while( used = 0 ) then
  27:             not_empty.wait;
  28:         x := data[rp];
  29:         rp := rp + 1 ;
  30:         if( rp >= BUFFER_SIZE )
  31:             rp := 0 ;
  32:         used := used - 1 ;
  33:         not_full.signal;
  34:     end
  35: begin
  36:     rp := 0 ;
  37:     wp := 0 ;
  38:     used := 0 ;
  39: end;
  40: 
  41: ...
  42: var buf : circular_buffer ;
  43: init buf;
  44: ...
  45: 

■Communicating Sequential Processes

C. A. R. Hoare: "Communicating sequential processes", Communications of the ACM, Volume 21 , No.8 pp.666-677, 1978. http://portal.acm.org/citation.cfm?doid=359576.359585

◆通信とプロセス

直接名前付けの通信。通信相手のプロセスを明示的に指定する。 (<−>間接では、メール・ボックスを指定する。)

Process A:

...
B ? x;   -- プロセス B からの通信を待つ。
         -- メッセージを受け取り、変数 x へ保存。
B ! x*2; -- プロセス B へ値 x * 2 を送る。
...

Process B:

...
y := 5;
A ! y + 1; -- プロセス A へ値 5 を送る。
...
A ? y;     -- プロセス A からの通信を待つ。
           -- メッセージを受け取り、変数 y へ保存。

プロセスを指定する名前付けでは、ライブラリが作れない。マクロでごまかす。

◆制御構造、ガード付きコマンド、非決定性

ガード → 節 ]
ガード付き節。条件式。 「→」の左がすべて真なら、「→」の右を実行する。
* [ <test> → <action> ]
繰り返し。while test do action <test>の部分は、ガード(guard)と呼ばれる。 ガードには、単純な条件式の他に、入力(受信)が書ける。出力(送信)は書けない。
名前::...
プロセスの名前付け。
||
並行実行。A || B で、プロセス A と B が並行に実行される。
 (正方形ではなく、縦長に書くのが正確。)
alternative command。非決定性の記述。
name ( val1, val2 )
構造体の定義。name が構造体の名前。val1, val2 は値。 フィールドには名前がつけられない。
(x,y) := (y,x)
名前がない構造体の代入。 並行に実行されるので、これで変数のswapが可能。
マクロ名 ≡ 本体
マクロ定義。
プロセスの配列 P(0..n)
意味としては、プロセスがマクロ展開される。 プロセスは添字で区別される。
マージ・プロセスの例。プロセスX, Y, Zのいずれかから送られてきたメッセー ジを受け取り、それをプロセス Sink に送る。
Merge:: c:character;
        *[    X ? c → Sink ! c
           □ Y ? c → Sink ! c
           □ Z ? c → Sink ! c ]

◆有限バッファ

[ Buffer::
    buf(0..bufsize-1): buffer_element;
    first, last : integer;
    j,k         : integer;
    first :=0;
    last :=0;
    * [ (j: 1..numbprod)
        (last + 1) mod bufsize ≠ first;
	Producer(j) ? buf(last) →
	    last := (last +1) mod bufsize
        □
        (k: 1..numcons)
        first ≠ last;
        Consumer(k) ? more() →
	    Consumer(k) ! buf(first);
	    first := (first + 1) mod bufsize
      ]
|| (i:1..numprod) PRODUCER -- 生産者のプログラム
|| (i:1..numcons) CONSUMER -- 消費者のプログラム
]

◆Occam

Occam は、Inmos 社が CSP を元にして開発した言語。 Transputer は、Occam を実行するためのハードウェア。

■Ada

◆タスク

並行に動作するオブジェクト(現在の用語ではスレッド)を実現する。

◆仕様と本体

Adaは、カプセル化を重視した設計になっている。
仕様(specification)
インタフェースの記述。プログラムの他の要素から参照できる。仕様だ けわかれば、分割コンパイルできる。
本体(body)
プログラムのコード、変数など。

◆エントリ

プロセス間通信(タスク間通信)の受け口。仕様には、entry 文で記述する。 本体では、そのエントリへの通信が可能な場所にaccept 文を記述する。

仕様の例。1行のバッファ。

task line_block is
    entry add( c: in character);
end  line_block;
in は、入力パラメタの意味。タスク側から見て入力(受信)。

受け側の例

accept add( c: in character) do
    thisline(i) := c;
end add;

呼出し側の例

    line_block.add("d");
エントリを呼び出すと、呼び出したタスクは、ブロックされる。 受け入れ側で、accept ... end が終わると、ブロックが解除される。

◆ランデブ

プロセス1、プロセス2、ランデブ

図? 対称的なランデブ

呼出し側、accept側、end、ランデブ

図? Adaのランデブ

◆select文

CSP ガード付きコマンドの Ada 版。
select
    when  条件 =>
        accept ... do ... end;
or
    when  条件 =>
        accept ... do ... end;
or
    when  条件 =>
        delay seconds;
end
開いているエントリ
when の条件が真
閉じているエントリ
when の条件が偽

◆ラインプリンタのバッファ

type line is array (1..120) of character;

task line_block is
    entry add( c: in character);
    entry plese_print( ln: out line);
end  line_block;

task body line_block is
    printer_trouble_time  : constant integer := 300;
    print_line, fill_line : line;
    nextfree              : integer;
    print_ready           : boolean;
begin
    nextfree    := 1;
    print_ready := false;
    loop
        select
	    when (nextfree < 121 ) =>
	        accept add( c: in character) do
		    fill_line(nextfree) := c;
		end add;
		nextfree := nextfree + 1;
		if( next_free = 121 ) and not print_ready then
		    print_ready := true;
		    nextfree    := 1;
		    print_line	:= fill_line;
		end if;
	or
	    when print_ready =>
	        accept please_print( ln: out line ) do
		    ln := print_line;
		end please_print;
		if nextfree = 121 then
		    print_line := fill_line;
		    nextfree   := 1;
		else
		    print_ready := false;
		end if;
        or
	    when print_ready and (nextfree > 120) =>
	        delay printr_trouble_time;
		    printer_trouble;
        end select;
    end loop;
end line_block;

■Argus

トランザクション機能を内部に含む言語。 発音は、英語読みでアーガス。ギリシャ神話読みでは、アルゴス。

Barbara Liskov: "Distributed programming in Argus", Communications of the ACM, Vol.31, No.3, pp.300-312, 1988.

◆ガーディアン(guardian)

並行に動作するオブジェクト(現在の用語ではスレッド)を実現する。

◆安定な変数

ガーディアンは、2種類の局所変数を持つ。
安定(stable)
クラッシュ(再起動)しても、生き残る。
揮発(volatile)
クラッシュ(再起動)すると、値が失われる。

◆アクション(action)

安定な変数に対するトランザクションを実現する。安定な変数を、整合性がと れた状態から、次の整合性がとれた状態へ遷移させる。( 途中でクラッシュしたら、元に戻す。)

トップアクション
トップレベルのトランザクションを実現する。commit するか abort するか。 内部にサブアクションを持つ。 abort したら、内部のサブアクションもすべて abort。
サブアクション
ネストしたトランザクションを実現する。
他のガーディアンのハンドラの呼出し(遠隔手続き呼び出し)は、 サブアクションになる。

◆トランザクションの実装

◆ガーディアンの宣言

  1. 名前
  2. 生成ルーチン。引数の取得。
  3. 安定な変数
  4. 揮発的な変数
  5. ハンドラ手続き(外から呼ばれるエントリ)
  6. バックグランド・ルーチン。ガーディアン自身のプログラム。
  7. 復旧手続き。クラッシュ後に実行され、揮発的な変数を状態に戻す。
安定な変数については、システムが自動的に整合性が取れた状態に復旧する。

◆銀行の支店

branch = guardian is create handles total, open, close, deposit, withdraw
  % type definitions
  htable = atomic_array[bucket]
  bucket = atomic_array[pair]
  pair = atomic_record[num: account_number, acct: acct_info]
  wet_info = atomic_record[bal: int]
  account_number = atomic_record[code: string, num: int]
  intcell = atomic_record[val: int]

  stable ht: htable    % the table of accounts
  stable code: string  % the code for the branch
  stable seed: intcell % the seed for generating new account numbers

  create = creator (c: string, size: int) returns (branch)
    code := c
    seed.val := 0
    ht := htable$new()
    for i: int in int$from_to(1, size) do
      htable$addh(ht, bucket$new())
    end
    return (self)
  end create

  total = handler () returns (int)
    sum: int := 0
    for b: bucket in htable$elements(ht) do
      for p: pair in bucket$elements(b) do
	sum := sum + p.acct.bal
      end
    end
    return (sum)
  end total

  open = handler () returns (account_number)
    intcell$write_lock(seed) % get a write lock on the seed
    a: account_number := account_ number${code: code, num: seed.val}
    seed.val := seed.val + 1
    bucket$addh(ht[hash(a.num)], pair${num: a, acct: acct_info${bal: 0}})
    return (a)
  end open

  close = handler (a: account_number) signals (no_such_acct, positive_balance)
    b: bucket := ht[hash(a.num)]
    for i: int in bucket$indexes(b) do
      if b[i].num != a then continue end
      if b[i].acct.bal > 0 then signal positive-balance end
      b[i] := bucket$top(b) % store topmost element in place of closed account
      bucket$remh(b) % discard topmost element
      return
    end
    signal no_such_acct
  end close

  lookup = proc (a: account_number) returns (acct_info) signals (no_such__acct)
    for p: pair in bucket$elements(ht[hash(a.num)]) do
      if p.num = a then return (pacct) end
    end
    signal no_such_acct
    end lookup

  deposit = handler (a: account_number, amt: int) signals (no_such_acct, negativ_amount)
    if amt < 0 then signal negative_amount end
    ainfo: acct_info := lookup(a) resignal no_such_acct
    ainfo.bal := ainfo.bal + amt
  end deposit

  withdraw = handler (a: account_number, amt: int)
    signals (no_such_acct, negative_amount, insufficient_funds)
    if amt < 0 then signal negative amount end
    ainfo: acct_info := lookup(a) resignal no_such_acct
    if ainfo.bal < amt then signal insufficient_funds end
    ainfo.bal := ainfo.bal - amt
  end withdraw
end branch

■分散型プログラミング言語一般論

プログラミング言語は、ネットワークの存在や通信をさまざまな度合いで隠し ている。

問題:

「socket があれば、分散型プログラミング言語」とは、定義したくない。

◆分散型OSとネットワークOS

ネットワーク・オペレーティング・システム

ネットワークの機能はある。

利用者は、常にどの計算機を使っているのかを意識する必要がある。

コマンド

システムコール

分散型オペレーティング・システム

利用者は、仮想的な1台の計算機を使っているように感じ、計算機と計算機の 境界線が見えなくなる。分散透明性(network transparency)が実現されてい る。

目標

現在の技術

注意: 負荷分散の分散(load sharing/balancing)とこの講義のタイトルの分散 (distributed)は意味が違う。

◆ローカルとリモートの統合

A note on Distributed Computing [Waldo 1994]

この方法論にそって開発されたプログラミング言語もあるが、 そのような言語のアプリケーションの範囲は狭い。

◆ローカルのプログラミングと分散のプログラミングの違い

歴史 単に繋ぐだけでは、分散の難しい問題は、解決されない。

◆分散の難しい問題

例:

◆RMI付Javaは、分散型プログラミング言語か

◆分散型プログラミング言語の要件

  1. 位置と型の直行。
  2. 遠隔オブジェクト生成
  3. ローカルとリモートを区別したい時には区別できる
  4. ローカルとリモートを区別したくない時には区別しないでよい
  5. Garbage colection
  6. オブジェクト・マイグレーション
  7. グループ化
  8. 同期呼出しと非同期呼出しの両方の支援
  9. 分散でも、並行性を言語で支援したい

分散の難しい問題は、まだ汎用的に解ける技術はない。 汎用性がないと、プログラミング言語には採り入れにくい。

◆分散型プログラミング言語で分散システムが作れるか

分散OS上で分散システムが走るか。

■Emerald

初期(1980年代)の分散型言語。オブジェクト指向。

一般的な特徴

Emerald の目標。

分散透明性と移動は、矛盾している。

◆Emeraldの参照

Emerald では、オブジェクトのアクセスは、参照(reference)を通じて行われ る。参照は、ローカルもリモートも区別がない。 (分散透明性が実現されている)。

メソッド呼出しの引数も参照で渡される。オブジェクトの場所が違うと、重た い。コピーの方が速い。

◆Emeraldのオブジェクト・マイグレーション

Emerald には効率を挙げるために、オブジェクトを移動(migrate)させる機能 がある。
move X to Y
オブジェクト X を、オブジェクト Y のあるホストに移動。ヒント (システムは実際には移動させないこともある)。
fix X at Y
オブジェクト X を、オブジェクト Y のあるホストに移動して、 さらに動かないように固定する。
unfix X
オブジェクト X を、再び移動可能にする。
refix X at Z
fix されているオブジェクト X を、オブジェクト Z のあるホストに移 動して、さらに動かないように固定する。全体が atomic に行われる。

Call by reference が重たい。

call by move
引数をオブジェクトの方に移動させる。
call by visit
メソッドがリターンすると、オブジェクトも戻ってくる。
call by move return
結果のオブジェクトも戻ってくる。
移動の粒度が重要。オブジェクトのグループ化。 attached キーワードがついていると、いっしょに移動する。

■Voyager

Glue の Graham Glass より作成されたJava 用の分散オブジェクト。(Voyager の方が古い。)

◆Voyagerのオブジェクトの公開


import com.objectspace.voyager.*;

public class BallMachine  {

    public static void main(String[] args) {
		try {
			Voyager.startup("8000"); // als Server starten
			Ball ball = new Ball();
			Namespace.bind("EinBall",ball);
		} catch( Exception exception ) {
			System.err.println( exception );
		}
    }
}

◆Voyagerのオブジェクトの利用

import com.objectspace.voyager.*;

public class Bat {

    public void play(IBall ball) {
		System.out.println("Hitting the new Ball");
		ball.hit();
    }

    public static void main(String[] args) {
		try {
		    Voyager.startup(); // als Client starten
			Bat bat = new Bat();
			IBall ball = (IBall) Namespace.lookup("//vsyspc5.informatik.uni-hamburg.de:8000/EinBall");
			bat.play(ball);
		} catch( Exception exception ) {
			System.err.println( exception );
		}
		Voyager.shutdown();
    }  
}

◆Voyagerのリモート・リファレンスと移動

Proxy.of(obj) で、リモートリファレンスを生成する。 Mobility.of(obj)で、移動可能なオブジェクトを動的に生成する。

    IMobility mobileObj = Momility.of(obj);
    mobileObj.moveTo("url");

import com.objectspace.voyager.*;
import com.objectspace.voyager.mobility.*;

public class Bat {

   public void play(IBall ball, String url) {
	  try {
		 ball.hit();
		 System.out.println("Ball bewegen?");
		 Mobility.of(ball).moveTo(url);
		 System.out.println("Ball bewegt");
	  } catch (MobilityException e) {
		 System.out.println(e);
		 e.printStackTrace();
	  }
   }

   public static void main(String[] args) {
      try {
		 Voyager.startup("9001"); 
		 ClassManager.enableResourceServer();
		 Bat bat = new Bat();
		 //Ball newball= new Ball();
		 IBall ball = (IBall) Proxy.of(new Ball());
		 bat.play(ball,"//vsyspc5:8000");
		 System.out.println("Ball 1ste mal gespielt");
		 bat.play(ball,"//localhost:9001");
      } catch( Exception exception ) {
	    System.err.println( exception );
      }
      Voyager.shutdown();
   }
}

◆Voyagerの非同期メソッド呼出し

非同期メソッド呼出しがある。

    IA a1 = (IA) Factory.create("A","//sun:8000");
    a1.method(param1,param2);  // 同期 

    Result r1 = Future.invoke(object, "method", // 非同期
        new Object [] {param1,param2});
...
    if( result.isAvailable() )
    {
       int x = r1.readInt();
    }
メソッド名を文字列で渡す。結果として、Result 型を返す。isAvailable() メソッドで終了を待つ。readInt(), readByte(), readObject() で、int, byte, オブジェクトに戻す。

◆Voyagerのマルチキャスト

Voyager Space を使ってマルチキャストができる。
    ISubspace subspace = (ISubspace) Namespace.lookup("//sun:9000/Subspace");

    A a1 = new A();
    subspace.add(a1);

    a1.method1( param1, param2 );

    Object [] params = new Object [] { param1, param2 };
    Multicast.invoke(subspace,"method1",params,"A");

■Dejay

分散Javaの1つ。

◆Dejeyのオブジェクト・マイグレーション

    DjProcessor p1 = new DjProcessor("sun:8000");
    DjHello hello = new DjHello(p1);
    hello.moveTo("mac:9000");

◆DjProcessor

1つ目の仮想プロセッサは、暗黙的に作られる。 2つ目以降は、明示的に作る。
    DjProcessor p1 = new DjProcessor("hostname:port");
これにより、ホスト hostname 上の ポート番号 port で新たな 仮想プロセッサが作られる。 ホスト hostname では、事前に Voyager デーモンを走らせておく必要がある。

◆遠隔オブジェクト

class A (.dj) から、自動的に スタブ(プロキシ)となる class DjA が生成される。 コンストラクタの引数が1個多い。
    DjA a1 = new DjA( Aの引数・・・, DjProcessor );
これで、遠隔の仮想プロセッサでオブジェクトが生成され、 遠隔参照(remote reference)が返される。 通常の参照と同じになるように頑張っているが、 一部違う。

    DjA a2 ;
    a2 = ... ;
    ...
    if( a2 == a1 ) // 遠隔のオブジェクトではなくてローカルのスタブの比較
    {
       ...
    }
equals() メソッドは、遠隔でも働く。

ローカル・オブジェクトを引数にして、 リモート・オブジェクトを呼ぶと、自動的に遠隔にコピーが渡される。 (遠隔オブジェクトが作られて、遠隔参照が渡されるのではない。)

DjX 型のオブジェクトを 遠隔にリターンすることはできる。 DjX がみつからなければ、 Xのコピーが返される。

remote.getCopy() で、ローカルのコピーが得られる。

    DjA a1 = new DjA(p1);
    A a2 = a1.getCopy();
3つのメソッド呼出しのセマンティクスがある。
    DjA a1 = new DjA(p1);
    B b1 = a.method1( 10 );		 // 同期
    B b2 = a.method1( 10, Dejey.ASYNC ); // 非同期
    a.method1( 10, Dejay.ONEWAY );       // 一方向
非同期には、完了待ちの方法が2種類ある
    B b1 = a.method1( 10, Dejey.ASYNC );
    b1.method2(); // wait by nesessity

    B b2 = a.method1( 10, Dejey.ASYNC );
    if( b2.isAvailable() )
    {
        b2.method2();
    }
    else
    {
        // do something
        b2.waitForResult();
        b2.method2();
    }

◆Dejeyの名前サービス

オブジェクトに文字列の名前を付けて、遠隔からアクセスできるように公開す ることができる。

2レベルの名前。

    DjProcessor p1 = new DjProcessor("lin:8000");
    p1.registerByName("p1");
    DjA a1 = new DjA(p1);
    a1.registerByName("a1");
    DjProcessor p1 = Dejay.getProcessorByName("p1");
    DjA a1 = p1.getObjectByName("a1");

◆永続性

分散と永続は、かなり関係している。 Dejay では、仮想プロセッサ単位で永続にできる。 DjProcessor のコンストラクタで、文字列の名前と データベースの名前を取る。
    DjProcessor p1 = new DjProcessor("PersistProc","ProcesssorDB","lin:8000");
    DjA a1 = new DjA();
    if( p1.isPersistable ) {
       p1.persist();
       p1.flush();
    }
persist() を呼ぶと、データベースに保存される。 flush() を呼ぶと、メモリが GC で回収可能になる。

persist() でデータベースに保存されたオブジェクトも、 使われる自動的にデータベースから回復される。

仮想プロセッサ全体がデータベースに保存できる。 次のようにして回復できる。

    DjProcessor p1 = Dejay.getProcessorFromDB("PersistProc",
        "ProcesssorDB","lin:8000");

◆制限


↑[もどる] ←[2月8日] ・[2月15日] →[2月20日]
Last updated: 2008/02/15 11:22:05
Yasushi Shinjo / <yas@is.tsukuba.ac.jp>