並行プログラミング言語(1)

並行システム

                               システム情報系情報工学域,
			       システム情報工学研究科コンピュータサイエンス専攻
                               新城 靖
                               <yas@cs.tsukuba.ac.jp>

このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2011/2012-02-09
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/cs/
http://www.cs.tsukuba.ac.jp/~yas/

■今日の重要な話

参考文献

R.E. フィルマン, D.P. フリードマン著, 雨宮 真人, 尾内 理紀夫, 高橋 直 久 (訳): "協調型計算システム -- 分散型ソフトウェアの技法と道具立て", マグロウヒル (1986). ISBN-10: 4895010309.

Marko Boger: Java in Distributed Systems: Concurrency, Distribution and Persistence, John Wiley & Sons, 2001. ISBN: 0471498386

■Concurrent Pascal

2011年12月08日の再掲 逐次型プログラミング言語 Pascal を拡張したもの。

◆参考文献

Per Brinch Hansen: "The Architecture of Concurrent Program", Prentice Hall (1977).

Per Brinch Hansen著, 田中英彦訳: "並行動作プログラムの構造", 日本コン ピュータ協会 (1980).

◆プロセス

type procA_t = process(引数・・・);
   var
      局所データの宣言
   procedure proc1(引数・・・);
   procedure proc2(引数・・・);
begin
   cycle
       ・・・
   end;
end

var procA1 : procA_t ;
init procA1(引数);

◆モニタ

type monA_t = monitor(引数・・・);
   var
      局所データの宣言
   procedure entry proc1(引数・・・);
   procedure entry proc2(引数・・・);
begin
   ローカルデータの初期化;
end

var monA1 : monA_t ;
init monA1(引数);

◆条件変数とキュー

  cv1 : condition;
  cv1.wait;      呼び出したプロセスを待たせる。
  cv1.signal;   待っているプロセスがいれば全て再開させる。
  q1 : queue;
  delay(q1);      呼び出したプロセスをそのキューで待たせる。
  continue(q1);   そのキューで待っているプロセスがいれば1つだけ再開させる。

◆有限バッファ

Unix のパイプのようなことを Concurrent Pascal のプロセス(スレッド)を使って実行したい。
producer | consumer
2つのスレッドの間には、バッファを置く。

図? 環状バッファ(有限バッファ)、生産者、消費者

図? 環状バッファ(有限バッファ)、生産者、消費者"

バッファが空の時、consumer() は、producer() が何かデータをバッ ファに入れるのを待つ。バッファがいっぱいの時、producer() は、 consumer() がバッファから何かデータを取り出すのを待つ。

手続き

   1: const BUFFER_SIZE = 4;
   2: type circular_buffer =
   3: monitor
   4: var
   5:     rp : integer ;
   6:     wp : integer ;
   7:     data: array [0..BUFFER_SIZE-1] of integer;
   8:     used: integer;
   9:     not_empty : condition;
  10:     not_full  : condition;
  11: 
  12:     procedure entry put(x:integer);
  13:     begin
  14:         while( used = BUFFER_SIZE ) do
  15:             non_full.wait;
  16:         data[wp] := x;
  17:         wp := wp + 1 ;
  18:         if( wp >= BUFFER_SIZE )
  19:             wp := 0 ;
  20:         used := used + 1 ;
  21:         not_empty.signal;
  22:     end
  23: 
  24:     procedure entry get(result x:integer);
  25:     begin
  26:         while( used = 0 ) then
  27:             not_empty.wait;
  28:         x := data[rp];
  29:         rp := rp + 1 ;
  30:         if( rp >= BUFFER_SIZE )
  31:             rp := 0 ;
  32:         used := used - 1 ;
  33:         not_full.signal;
  34:     end
  35: begin
  36:     rp := 0 ;
  37:     wp := 0 ;
  38:     used := 0 ;
  39: end;
  40: 
  41: ...
  42: var buf : circular_buffer ;
  43: init buf;
  44: ...
  45: 

■Communicating Sequential Processes

C. A. R. Hoare: "Communicating sequential processes", Communications of the ACM, Volume 21 , No.8 pp.666-677, 1978. http://portal.acm.org/citation.cfm?doid=359576.359585

◆通信とプロセス

直接名前付けの通信。通信相手のプロセスを明示的に指定する。 (<−>間接では、メール・ボックスを指定する。)

Process A:

...
B ? x;   -- プロセス B からの通信を待つ。
         -- メッセージを受け取り、変数 x へ保存。
B ! x*2; -- プロセス B へ値 x * 2 を送る。
...

Process B:

...
y := 5;
A ! y + 1; -- プロセス A へ値 5 を送る。
...
A ? y;     -- プロセス A からの通信を待つ。
           -- メッセージを受け取り、変数 y へ保存。

プロセスを指定する名前付けでは、ライブラリが作れない。マクロでごまかす。

◆制御構造、ガード付きコマンド、非決定性

ガード → 節
ガード付き節。条件式。 「→」の左がすべて真なら、「→」の右を実行する。
* [ <test> → <action> ]
繰り返し。while test do action <test>の部分は、ガード(guard)と呼ばれる。 ガードには、単純な条件式の他に、入力(受信)が書ける。出力(送信)は書けない。
名前::...
プロセスの名前付け。
||
並行実行。A || B で、プロセス A と B が並行に実行される。
 (正方形ではなく、縦長に書くのが正確。)
alternative command。非決定性の記述。
name ( val1, val2 )
構造体の定義。name が構造体の名前。val1, val2 は値。 フィールドには名前がつけられない。
(x,y) := (y,x)
名前がない構造体の代入。 並行に実行されるので、これで変数のswapが可能。
マクロ名 ≡ 本体
マクロ定義。
プロセスの配列 P(0..n)
意味としては、プロセスがマクロ展開される。 プロセスは添字で区別される。
マージ・プロセスの例。プロセスX, Y, Zのいずれかから送られてきたメッセー ジを受け取り、それをプロセス Sink に送る。
Merge:: c:character;
        *[    X ? c → Sink ! c
           □ Y ? c → Sink ! c
           □ Z ? c → Sink ! c ]

◆有限バッファ

   1: [ Buffer::
   2:     buf(0..bufsize-1): buffer_element;
   3:     first, last : integer;
   4:     j,k         : integer;
   5:     first :=0;
   6:     last :=0;
   7:     * [ (j: 1..numprod)
   8:         (last + 1) mod bufsize ≠ first;
   9:         Producer(j) ? buf(last) →
  10:             last := (last +1) mod bufsize
  11:         □
  12:         (k: 1..numcons)
  13:         first ≠ last;
  14:         Consumer(k) ? more() →
  15:             Consumer(k) ! buf(first);
  16:             first := (first + 1) mod bufsize
  17:       ]
  18: || (i:1..numprod) PRODUCER -- 生産者のプログラム
  19: || (i:1..numcons) CONSUMER -- 消費者のプログラム
  20: ]

◆Occam

Occam は、Inmos 社が CSP を元にして開発した言語。 Transputer は、Occam を実行するためのハードウェア。

■Ada

◆タスク

並行に動作するオブジェクト(現在の用語ではスレッド)を実現する。

◆仕様と本体

Adaは、カプセル化を重視した設計になっている。
仕様(specification)
インタフェースの記述。プログラムの他の要素から参照できる。仕様だ けわかれば、分割コンパイルできる。
本体(body)
プログラムのコード、変数など。

◆エントリ

プロセス間通信(タスク間通信)の受け口。仕様には、entry 文で記述する。 本体では、そのエントリへの通信が可能な場所にaccept 文を記述する。

仕様の例。1行のバッファ。

task line_block is
    entry add( c: in character);
end  line_block;
in は、入力パラメタの意味。タスク側から見て入力(受信)。

受け側の例

accept add( c: in character) do
    thisline(i) := c;
end add;

呼出し側の例

    line_block.add("d");
エントリを呼び出すと、呼び出したタスクは、ブロックされる。 受け入れ側で、accept ... end が終わると、ブロックが解除される。

◆ランデブ

プロセス1、プロセス2、ランデブ

図? 対称的なランデブ

呼出し側、accept側、end、ランデブ

図? Adaのランデブ

◆select文

CSP ガード付きコマンドの Ada 版。
select
    when  条件 =>
        accept ... do ... end;
or
    when  条件 =>
        accept ... do ... end;
or
    when  条件 =>
        delay seconds;
end
開いているエントリ
when の条件が真
閉じているエントリ
when の条件が偽

◆ラインプリンタのバッファ

   1: type line is array (1..120) of character;
   2: 
   3: task line_block is
   4:     entry add( c: in character);
   5:     entry plese_print( ln: out line);
   6: end  line_block;
   7: 
   8: task body line_block is
   9:     printer_trouble_time  : constant integer := 300;
  10:     print_line, fill_line : line;
  11:     nextfree              : integer;
  12:     print_ready           : boolean;
  13: begin
  14:     nextfree    := 1;
  15:     print_ready := false;
  16:     loop
  17:         select
  18:             when (nextfree < 121 ) =>
  19:                 accept add( c: in character) do
  20:                     fill_line(nextfree) := c;
  21:                 end add;
  22:                 nextfree := nextfree + 1;
  23:                 if( next_free = 121 ) and not print_ready then
  24:                     print_ready := true;
  25:                     nextfree    := 1;
  26:                     print_line  := fill_line;
  27:                 end if;
  28:         or
  29:             when print_ready =>
  30:                 accept please_print( ln: out line ) do
  31:                     ln := print_line;
  32:                 end please_print;
  33:                 if nextfree = 121 then
  34:                     print_line := fill_line;
  35:                     nextfree   := 1;
  36:                 else
  37:                     print_ready := false;
  38:                 end if;
  39:         or
  40:             when print_ready and (nextfree > 120) =>
  41:                 delay printr_trouble_time;
  42:                     printer_trouble;
  43:         end select;
  44:     end loop;
  45: end line_block;

■分散型プログラミング言語一般論

プログラミング言語は、ネットワークの存在や通信をさまざまな度合いで隠し ている。

問題:

「socket があれば、分散型プログラミング言語」とは、定義したくない。

◆分散型OSとネットワークOS

ネットワーク・オペレーティング・システム

ネットワークの機能はある。

利用者は、常にどの計算機を使っているのかを意識する必要がある。

コマンド

システムコール

分散型オペレーティング・システム

利用者は、仮想的な1台の計算機を使っているように感じ、計算機と計算機の 境界線が見えなくなる。分散透明性(network transparency)が実現されてい る。

目標

現在の技術

注意: 負荷分散の分散(load sharing/balancing)とこの講義のタイトルの分散 (distributed)は意味が違う。

◆ローカルとリモートの統合

A note on Distributed Computing [Waldo 1994]

この方法論にそって開発されたプログラミング言語もあるが、 そのような言語のアプリケーションの範囲は狭い。

◆ローカルのプログラミングと分散のプログラミングの違い

歴史 単に繋ぐだけでは、分散の難しい問題は、解決されない。

◆分散の難しい問題

例:

◆RMI付Javaは、分散型プログラミング言語か

◆分散型プログラミング言語の要件

  1. 位置と型の直行。
  2. 遠隔オブジェクト生成
  3. ローカルとリモートを区別したい時には区別できる
  4. ローカルとリモートを区別したくない時には区別しないでよい
  5. Garbage colection
  6. オブジェクト・マイグレーション
  7. グループ化
  8. 同期呼出しと非同期呼出しの両方の支援
  9. 分散でも、並行性を言語で支援したい

分散の難しい問題は、まだ汎用的に解ける技術はない。 汎用性がないと、プログラミング言語には採り入れにくい。

◆分散型プログラミング言語で分散システムが作れるか

分散OS上で分散システムが走るか。

■Emerald

初期(1980年代)の分散型言語。オブジェクト指向。

一般的な特徴

Emerald の目標。

分散透明性と移動は、矛盾している。

◆Emeraldの参照

Emerald では、オブジェクトのアクセスは、参照(reference)を通じて行われ る。参照は、ローカルもリモートも区別がない。 (分散透明性が実現されている)。

メソッド呼出しの引数も参照で渡される。オブジェクトの場所が違うと、重た い。コピーの方が速い。

◆Emeraldのオブジェクト・マイグレーション

Emerald には効率を挙げるために、オブジェクトを移動(migrate)させる機能 がある。
move X to Y
オブジェクト X を、オブジェクト Y のあるホストに移動。ヒント (システムは実際には移動させないこともある)。
fix X at Y
オブジェクト X を、オブジェクト Y のあるホストに移動して、 さらに動かないように固定する。
unfix X
オブジェクト X を、再び移動可能にする。
refix X at Z
fix されているオブジェクト X を、オブジェクト Z のあるホストに移 動して、さらに動かないように固定する。全体が atomic に行われる。

Call by reference が重たい。

call by move
引数をオブジェクトの方に移動させる。
call by visit
メソッドがリターンすると、オブジェクトも戻ってくる。
call by move return
結果のオブジェクトも戻ってくる。
移動の粒度が重要。オブジェクトのグループ化。 attached キーワードがついていると、いっしょに移動する。

■Voyager

Glue の Graham Glass より作成されたJava 用の分散オブジェクト。(Voyager の方が古い。)

◆Voyagerのオブジェクトの公開


import com.objectspace.voyager.*;

public class BallMachine  {

    public static void main(String[] args) {
		try {
			Voyager.startup("8000"); // als Server starten
			Ball ball = new Ball();
			Namespace.bind("EinBall",ball);
		} catch( Exception exception ) {
			System.err.println( exception );
		}
    }
}

◆Voyagerのオブジェクトの利用

import com.objectspace.voyager.*;

public class Bat {

    public void play(IBall ball) {
		System.out.println("Hitting the new Ball");
		ball.hit();
    }

    public static void main(String[] args) {
		try {
		    Voyager.startup(); // als Client starten
			Bat bat = new Bat();
			IBall ball = (IBall) Namespace.lookup("//vsyspc5.informatik.uni-hamburg.de:8000/EinBall");
			bat.play(ball);
		} catch( Exception exception ) {
			System.err.println( exception );
		}
		Voyager.shutdown();
    }  
}

◆Voyagerのリモート・リファレンスと移動

Proxy.of(obj) で、リモートリファレンスを生成する。 Mobility.of(obj)で、移動可能なオブジェクトを動的に生成する。

    IMobility mobileObj = Momility.of(obj);
    mobileObj.moveTo("url");

import com.objectspace.voyager.*;
import com.objectspace.voyager.mobility.*;

public class Bat {

   public void play(IBall ball, String url) {
	  try {
		 ball.hit();
		 System.out.println("Ball bewegen?");
		 Mobility.of(ball).moveTo(url);
		 System.out.println("Ball bewegt");
	  } catch (MobilityException e) {
		 System.out.println(e);
		 e.printStackTrace();
	  }
   }

   public static void main(String[] args) {
      try {
		 Voyager.startup("9001"); 
		 ClassManager.enableResourceServer();
		 Bat bat = new Bat();
		 //Ball newball= new Ball();
		 IBall ball = (IBall) Proxy.of(new Ball());
		 bat.play(ball,"//vsyspc5:8000");
		 System.out.println("Ball 1ste mal gespielt");
		 bat.play(ball,"//localhost:9001");
      } catch( Exception exception ) {
	    System.err.println( exception );
      }
      Voyager.shutdown();
   }
}

◆Voyagerの非同期メソッド呼出し

非同期メソッド呼出しがある。

    IA a1 = (IA) Factory.create("A","//sun:8000");
    a1.method(param1,param2);  // 同期 

    Result r1 = Future.invoke(object, "method", // 非同期
        new Object [] {param1,param2});
...
    if( result.isAvailable() )
    {
       int x = r1.readInt();
    }
メソッド名を文字列で渡す。結果として、Result 型を返す。isAvailable() メソッドで終了を待つ。readInt(), readByte(), readObject() で、int, byte, オブジェクトに戻す。

◆Voyagerのマルチキャスト

Voyager Space を使ってマルチキャストができる。
    ISubspace subspace = (ISubspace) Namespace.lookup("//sun:9000/Subspace");

    A a1 = new A();
    subspace.add(a1);
    A a2 = new A();
    subspace.add(a2);

    a1.method1( param1, param2 );
    a2.method1( param1, param2 );

    Object [] params = new Object [] { param1, param2 };
    Multicast.invoke(subspace,"method1",params,"A");

■Dejay

◆Dejeyのオブジェクト

public class Hello implements java.io.Serializable  {
    public void sayHello(java.lang.String name) {
        System.out.println("Hello " + name);
    }
}
djeyc コンパイラで、2つの Java のクラスが作られる。

◆Dejeyの起動コード

	
import dejay.base.*;
public class HelloStartup {
    public static void main(String args[])
    {
        try {
	    DjProcessor p1 = new DjProcessor("//localhost:8000");
	    DjHello hello1 = new DjHello(p1);
	    hello1.sayHello("Thorsten");
	    p1.moveTo("//localhost:9000");
	    hello1.sayHello("Jan");
	    hello1.moveTo("//mac:9000");
        } catch (ConstructProcessorFailedException e) {
            System.out.println("Creating Virtual Processor failed."+e);
        }
    }
}

◆DjProcessor

1つ目の仮想プロセッサは、暗黙的に作られる。 2つ目以降は、明示的に作る。
    DjProcessor p1 = new DjProcessor("hostname:port");
これにより、ホスト hostname 上の ポート番号 port で新たな 仮想プロセッサが作られる。 ホスト hostname では、事前に Voyager デーモンを走らせておく必要がある。

◆遠隔オブジェクト

class A (.dj) から、自動的に スタブ(プロキシ)となる class DjA が生成される。 コンストラクタの引数が1個多い。
    DjA a1 = new DjA( Aの引数・・・, DjProcessor );
これで、遠隔の仮想プロセッサでオブジェクトが生成され、 遠隔参照(remote reference)が返される。 通常の参照と同じになるように頑張っているが、 一部違う。

    DjA a2 ;
    a2 = ... ;
    ...
    if( a2 == a1 ) // 遠隔のオブジェクトではなくてローカルのスタブの比較
    {
       ...
    }
equals() メソッドは、遠隔でも働く。

ローカル・オブジェクトを引数にして、 リモート・オブジェクトを呼ぶと、自動的に遠隔にコピーが渡される。 (遠隔オブジェクトが作られて、遠隔参照が渡されるのではない。)

DjX 型のオブジェクトを 遠隔にリターンすることはできる。 DjX がみつからなければ、 Xのコピーが返される。

remote.getCopy() で、ローカルのコピーが得られる。

    DjA a1 = new DjA(p1);
    A a2 = a1.getCopy();

◆メソッド呼出しのセマンティクス

3つのメソッド呼出しのセマンティクスがある。
    DjA a1 = new DjA(p1);
    B b1 = a.method1( 10 );		 // 同期
    B b2 = a.method1( 10, Dejey.ASYNC ); // 非同期
    a.method1( 10, Dejay.ONEWAY );       // 一方向
非同期には、完了待ちの方法が2種類ある
    B b1 = a.method1( 10, Dejey.ASYNC );
    b1.method2(); // wait by nesessity

    B b2 = a.method1( 10, Dejey.ASYNC );
    if( b2.isAvailable() )
    {
        b2.method2();
    }
    else
    {
        // do something
        b2.waitForResult();
        b2.method2();
    }

◆Dejeyの名前サービス

オブジェクトに文字列の名前を付けて、遠隔からアクセスできるように公開す ることができる。

2レベルの名前。

    DjProcessor p1 = new DjProcessor("lin:8000");
    p1.registerByName("p1");
    DjA a1 = new DjA(p1);
    a1.registerByName("a1");
    DjProcessor p1 = Dejay.getProcessorByName("p1");
    DjA a1 = p1.getObjectByName("a1");

◆永続性

分散と永続は、かなり関係している。 Dejay では、仮想プロセッサ単位で永続にできる。 DjProcessor のコンストラクタで、文字列の名前と データベースの名前を取る。
    DjProcessor p1 = new DjProcessor("PersistProc","ProcesssorDB","lin:8000");
    DjA a1 = new DjA();
    if( p1.isPersistable ) {
       p1.persist();
       p1.flush();
    }
persist() を呼ぶと、データベースに保存される。 flush() を呼ぶと、メモリが GC で回収可能になる。

persist() でデータベースに保存されたオブジェクトも、 使われる自動的にデータベースから回復される。

仮想プロセッサ全体がデータベースに保存できる。 次のようにして回復できる。

    DjProcessor p1 = Dejay.getProcessorFromDB("PersistProc",
        "ProcesssorDB","lin:8000");

◆制限

練習問題

★問題(801) CSPとAda

CSPとAdaで、概念が似ている点を1つ上げなさい。それについて簡単に説明し なさい。

★問題(802) Voyager

XML Web サービスを実現した Glue や Java RMI (Remote Method Invocation) は、遠隔手続き呼び出し(Remote Procedure Call) の機能を提供する。これら と比較して、Voyager が優れているを2つ上げなさい。そして、それらを簡単 に説明しなさい。

★問題(803) VoyagerとDejay

Voyager と比較して、Dejay が優れている点を1つ上げなさい。そして、それ を簡単に説明しなさい。
Last updated: 2012/02/08 22:13:23
Yasushi Shinjo / <yas@cs.tsukuba.ac.jp>