並行システム システム情報系情報工学域, システム情報工学研究科コンピュータサイエンス専攻 新城 靖 <yas@cs.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/cs/csys-2016/2016-05-23
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/cs/
http://www.cs.tsukuba.ac.jp/~yas/
http://portal.acm.org/citation.cfm?doid=359576.359585
Process A:
... B ? x; -- プロセス B からの通信を待つ。 Wait for a message from Process B. -- メッセージを受け取り、変数 x へ保存。 Receive a message and store it into the variable x. B ! x*2; -- プロセス B へ値 x * 2 を送る。Send the value x * 2 to Process B. ...
Process B:
... y := 5; A ! y + 1; -- プロセス A へ値 6 を送る。Send the value 6 to Process A. ... A ? y; -- プロセス A からの通信を待つ。Wait for a message from Process A. -- メッセージを受け取り、変数 y へ保存。Receive a message and store it into the variable y.
プロセスを指定する名前付けでは、ライブラリが作れない。マクロでごまかす。
名前::...
||
□
(正方形ではなく、縦長に書くのが正確。 (Unicode ▯))
Merge:: c:character; *[ X ? c → Sink ! c □ Y ? c → Sink ! c □ Z ? c → Sink ! c ]
1: [ Buffer:: 2: buf(0..bufsize-1): buffer_element; 3: first, last : integer; 4: j,k : integer; 5: first :=0; 6: last :=0; 7: * [ (j: 1..numprod) 8: (last + 1) mod bufsize ≠ first; 9: Producer(j) ? buf(last) → 10: last := (last +1) mod bufsize 11: □ 12: (k: 1..numcons) 13: first ≠ last; 14: Consumer(k) ? more() → 15: Consumer(k) ! buf(first); 16: first := (first + 1) mod bufsize 17: ] 18: || (i:1..numprod) PRODUCER -- 生産者のプログラム 19: || (i:1..numcons) CONSUMER -- 消費者のプログラム 20: ]
言語としては、何種類か(Plasma、Act-1、Act-2、Act-3、Omega、Ether)ある。 各言語で見かけがけっこうちがう。
図? アクタの基本概念/Basic concepts of the Actor model
アクタは、メッセージを受け取り、それに対して行為を行うような、通信を行う エージェントである。行為の種類には、次のようなものがある。
アクタがメッセージを受け取ると、いったんロックされる。ロックされるとメッ セージは処理されない。次のアクタに become したら、新しい後継のアクタが 同じメールボックスからメッセージを読み出して処理を続ける。
要素
goto f(1, 2, 3);のようなもの。
通常の階乗(in Scheme)
(define (fact n) (if (= n 0) 1 (* n (fact (- n 1)))))継続を取るような階乗
(define (fact-c n c) (if (= n 0) (c 1) (let ((c2 (lambda (x) (c (* n x))))) (fact-c (- n 1) c2))))
実行例
> (fact 3)
6
> (fact 4)
24
> (fact-c 3 print)
6> (fact-c 4 print)
24>
factorial≡λm. match m <n c> if n = 1 then (send c <1>) else if n > 1 then (send factorial <(n-1) (λk.(send c <n * k>))>)3の階乗を計算して、継続print_answerに送りたい時には、 次のように書く。
(send factorial <3 print_answer>)
(define (Factorial( )) (Is-Communication (a doit (with customer =m) (with number =n)) do (become Factorial) (if (NOT (= n 0)) (then (send m 1)) (else (let (x = (new FactCust (with customer m) (with number n))) (send Factorial (a doit (with customer x) (with number (- n 1))))))))) (define (FactCust (with customer =m) (with number =n)) (Is-Communicaton (a number k) do (send m (* n k))))
振る舞い記述 (define (名前 (with id パタン)) 通信ハンドラの並び) 通信ハンドラ (Is-Communication パタン do コマンド) letコマンド (let (変数名 = 式) do コマンド) 条件コマンド (if 式 (then do コマンドの並び) (else do コマンドの並び)) メッセージ送信コマンド (send メールボックス 値) becomコマンド (become 式) 新しいアクタの生成 (new 式)
(define (Account (with Balance =b)) (Is-Request (a Balance) do (become (Account (with Balance b))) (reply b)) (Is-Request (a Deposit (with Amount =a)) do (become (Account (with Balance (+ b a)))) (reply (a Deposit-Receipt (with Amount a)))) (Is-Request (a Withdrawal (with Amount =a)) do (if (> a b) (then do (become (Account (with Balnce b))) (complain (an Overdraft))) (else do (become (Account (with Balnce(- b a)))) (reply (a Withdrawal-Receipt (with Amount =a)))))))
http://www.ibm.com/developerworks/jp/java/library/j-scala04109.html
Ted Neward: "The busy Java developer's guide to Scala: Dive
deeper into Scala concurrency", IBM DeveloperWorks, 10 Apr 2009.
http://www.ibm.com/developerworks/java/library/j-scala04109.html
リスト 9. Counter の例 (アクターによる方法) より。
object CountingSample { case class Incr case class Value(sender : Actor) case class Lock(sender : Actor) case class UnLock(value : Int) class Counter extends Actor { override def act(): Unit = loop(0) def loop(value: int): Unit = { receive { case Incr() => loop(value + 1) case Value(a) => a ! value; loop(value) case Lock(a) => a ! value receive { case UnLock(v) => loop(v) } case _ => loop(value) } } } def main(args : Array[String]) : Unit = { val counter = new Counter counter.start() counter ! Incr() counter ! Incr() counter ! Incr() counter ! Value(self) receive { case cvalue => Console.println(cvalue) } counter ! Incr() counter ! Incr() counter ! Value(self) receive { case cvalue => Console.println(cvalue) } } }
http://www.recursionsw.com/products/voyager/voyager-intro.html
import com.objectspace.voyager.*; public interface IBall { public void hit(); }
import com.objectspace.voyager.*; public class Ball implements IBall { public void hit() { System.out.println("Ball has been hit"); } }
import com.objectspace.voyager.*; public class BallMachine { public static void main(String[] args) { try { Voyager.startup("8000"); // als Server starten Ball ball = new Ball(); Namespace.bind("EinBall",ball); } catch( Exception exception ) { System.err.println( exception ); } } }
import com.objectspace.voyager.*; public class Bat { public void play(IBall ball) { System.out.println("Hitting the new Ball"); ball.hit(); } public static void main(String[] args) { try { Voyager.startup(); // als Client starten Bat bat = new Bat(); IBall ball = (IBall) Namespace.lookup("//vsyspc5.informatik.uni-hamburg.de:8000/EinBall"); bat.play(ball); } catch( Exception exception ) { System.err.println( exception ); } Voyager.shutdown(); } }
IMobility mobileObj = Momility.of(obj); mobileObj.moveTo("url");
import com.objectspace.voyager.*; import com.objectspace.voyager.mobility.*; public class Bat { public void play(IBall ball, String url) { try { ball.hit(); System.out.println("Ball bewegen?"); Mobility.of(ball).moveTo(url); System.out.println("Ball bewegt"); } catch (MobilityException e) { System.out.println(e); e.printStackTrace(); } } public static void main(String[] args) { try { Voyager.startup("9001"); ClassManager.enableResourceServer(); Bat bat = new Bat(); //Ball newball= new Ball(); IBall ball = (IBall) Proxy.of(new Ball()); bat.play(ball,"//vsyspc5:8000"); System.out.println("Ball 1ste mal gespielt"); bat.play(ball,"//localhost:9001"); } catch( Exception exception ) { System.err.println( exception ); } Voyager.shutdown(); } }
IA a1 = (IA) Factory.create("A","//sun:8000"); a1.method(param1,param2); // 同期 synchronous Result r1 = Future.invoke(a1, "method", // 非同期 asynchronous new Object [] {param1,param2}); ... if( r1.isAvailable() ) { int x = r1.readInt(); }メソッド名を文字列で渡す。結果として、Result 型を返す。isAvailable() メソッドで終了を待つ。readInt(), readByte(), readObject() で、int, byte, オブジェクトに戻す。
ISubspace subspace = (ISubspace) Namespace.lookup("//sun:9000/Subspace"); A a1 = new A(); subspace.add(a1); A a2 = new A(); subspace.add(a2); a1.method1( param1, param2 ); a2.method1( param1, param2 ); Object [] params = new Object [] { param1, param2 }; Multicast.invoke(subspace,"method1",params,"A");
ロックよりも楽にプログラムを作りたい。 We would like to write programs more easily than that with locks.
特に分散では。集中でも、フォールト・トレランスの実現では必要。 Especially in distributed systems. Transaction is essential to realize fault tolerance.
歴史:1960年代、テープの時代。 失敗したら、巻き戻す(rollback)。
銀行口座間の預金の移動。
注意:Java の serializable とは意味が全く違う。
複数のトランザクションは、並行に実行可能。しかし、その複数のトランザク ションの最終結果は、それらを「ある順序」で「逐次的に」実行した時と同じ結果 になる。
逆に、トランザクションは、直列化可能なスケジュールが存在するなら、並行 に実行してもよい。直列化可能でないなら、トランザクションは、開始を遅ら せるか、アボートする。
図? トランザクションの処理(並行実行)/Concurrent execution of transactions 図? トランザクションの処理(直列化の例その1)/A serialized execution of transaction No.1 図? トランザクションの処理(直列化の例その2)/A serialized execution of transaction No.2 図? トランザクションの処理(直列化の例その3)/A serialized execution of transaction No.3
Begin_Transaction(); reserve("NRT-SFO"); reserve("SFO-MCO"); Commit();プログラムの記述としては、これだけ。どこにもロックは出てこない。内部的 には、ロックは行われる(ことがある)。
途中の reserve() が失敗したら、トランザクション全体を abort() する。
図? ファイルの変更されたブロックだけをコピー)
シャドー・コピーを作らず、ファイルのその場所を書き換える。 その前に、次の情報を安定記憶にログとして書き込む。
クラッシュからの回復もできる。
図? 安定記憶の実現
更新時:3 の前にクラッシュしても、常にディスク1が正しい(図?(b))。 この場合は、ディスク1からディスク1へコピーする。
チェックサムエラーが起きた場合(図?(c))、他のディスクからコピーして回復する。
もっとも単純な方法: Begin Transaction で、ファイルをロックする。 Commit/Abort でアンロックする。
注意:プログラマは、ロックを意識しない。
制約が強すぎる。
読み取りロック:read lock と write lock を分ける。
ロックの粒度(granularity)が大事
図? 2相ロック/Two-phase locking
定理:トランザクションの実現で2相ロックを使うならば、インターリーブさ れることで作られるスケジュールは直列化可能である。
実際のシステムでは、Commit/Abort するまで、一度確保したロックをはなさない。
第2相で、まとめて全部ロックを解放する(strict two-phase locking)。
図? 2相ロック/Strict two-phase locking
利点
注意:2相ロックと2相コミットは別。
実現(衝突時の対応):プライベート作業空間を使う実現が便利。 どんどんプライベート作業空間上のファイルを更新して、最後にコミットする か削除する。
性質
アルゴリズム:
Lamport のアルゴリズム等でクロックが同期されていれば、分散システムでも 使える。
2種類のプロセス
Barbara Liskov: "Distributed programming in Argus", Communications of
the ACM, Vol.31, No.3, pp.300-312, 1988.
http://dl.acm.org/citation.cfm?id=42399
文法は、CLU 言語に似ている。
参考:
http://www2.gssm.otsuka.tsukuba.ac.jp/staff/kuno/lectures/
1993.5 ゼミ: CLU言語入門@GSSM
http://www.pmg.lcs.mit.edu/CLU.html
CLU Home Page.
1: branch = guardian is create handles total, open, close, deposit, withdraw 2: % type definitions 3: htable = atomic_array[bucket] 4: bucket = atomic_array[pair] 5: pair = atomic_record[num: account_number, acct: acct_info] 6: acct_info = atomic_record[bal: int] 7: account_number = atomic_record[code: string, num: int] 8: intcell = atomic_record[val: int] 9: 10: stable ht: htable % the table of accounts 11: stable code: string % the code for the branch 12: stable seed: intcell % the seed for generating new account numbers 13: 14: create = creator (c: string, size: int) returns (branch) 15: code := c 16: seed.val := 0 17: ht := htable$new() 18: for i: int in int$from_to(1, size) do 19: htable$addh(ht, bucket$new()) 20: end 21: return (self) 22: end create 23: 24: total = handler () returns (int) 25: sum: int := 0 26: for b: bucket in htable$elements(ht) do 27: for p: pair in bucket$elements(b) do 28: sum := sum + p.acct.bal 29: end 30: end 31: return (sum) 32: end total 33: 34: open = handler () returns (account_number) 35: intcell$write_lock(seed) % get a write lock on the seed 36: a: account_number := account_number${code: code, num: seed.val} 37: seed.val := seed.val + 1 38: bucket$addh(ht[hash(a.num)], pair${num: a, acct: acct_info${bal: 0}}) 39: return (a) 40: end open 41: 42: close = handler (a: account_number) signals (no_such_acct, positive_balance) 43: b: bucket := ht[hash(a.num)] 44: for i: int in bucket$indexes(b) do 45: if b[i].num != a then continue end 46: if b[i].acct.bal > 0 then signal positive-balance end 47: b[i] := bucket$top(b) % store topmost element in place of closed account 48: bucket$remh(b) % discard topmost element 49: return 50: end 51: signal no_such_acct 52: end close 53: 54: lookup = proc (a: account_number) returns (acct_info) signals (no_such__acct) 55: for p: pair in bucket$elements(ht[hash(a.num)]) do 56: if p.num = a then return (p.acct) end 57: end 58: signal no_such_acct 59: end lookup 60: 61: deposit = handler (a: account_number, amt: int) signals (no_such_acct, 62: negativ_amount) 63: if amt < 0 then signal negative_amount end 64: ainfo: acct_info := lookup(a) resignal no_such_acct 65: ainfo.bal := ainfo.bal + amt 66: end deposit 67: 68: withdraw = handler (a: account_number, amt: int) signals (no_such_acct, 69: negative_amount, insufficient_funds) 70: if amt < 0 then signal negative amount end 71: ainfo: acct_info := lookup(a) resignal no_such_acct 72: if ainfo.bal < amt then signal insufficient_funds end 73: ainfo.bal := ainfo.bal - amt 74: end withdraw 75: end branch
if( a() ) { b(); } else { c(); }
図? 投機的実行 Speculative execution
この例では、a() が完了した時には、b()、c() が終わっていた時には、見かけ 上、即座に終わる。1999年に、サン・マイクロシステムズ社のビル・ジョイらによって開発され発 表された。
Jini の目標は、ネットワークで Plug & Play を実現すること。 機器をネットワークに接続しただけで、特別な設定 をしなくてもすぐに利用可能になること。
目標
JavaSpaces は、内部的に Jini のルックアップ・サービスの実現で使われて いる。JavaSpaces も、Jini でアクセス可能なサービスの1つと考えることも できる。
Jini のクラスライブラリは、 JavaSpaces の回 で紹介した Apache River が利用できる。
Microsoft の、Jini に対抗した技術。
サービスの登録と検索
ルックアップ・サービス自身も、最初から知られている必要はない。 ネットワーク上で自動的に探される。 Discovery と Join。
Java のオブジェクトが Lease。
リース期間は、延長することができる。 延長されなかった lease は、ルックアップ・サービスから削除される。 登録されているサービスを明示的に削除する仕組みは、存在しない。
サービスが削除された時には、分散型イベント配送サービスにより関係してい る所に知らされる。
トランザクションのインタフェースは定められているが、具体的な実現は各サー ビスにまかされている。
サービスの提供者とサービスの利用者の間は、最終的にはに RMI で接続される。
ServiceIDLister インタフェースを implements する。
package com.sun.jini.lookup; public interface ServiceIDListener extends java.util.EventListener { void serviceIDNotify(net.jini.core.lookup.ServiceID serviceID); }ルックアップ・サービスからコールバックされる。
Discovery の実現方法
図1 マルチキャストによる Discovery/Discovery by multicast
図2 Join
グループは、名前(文字列)で区別される。
Jini パッケージは、JoinManager という参照クラスを含む。
public JoinManager(Object obj, Entry[] attrSets, ServiceIDListener callback, LeaseRenewalManager leaseMgr) throws IOException public JoinManager(Object obj, Entry[] attrSets, String[] groups, LookupLocator[] locators, ServiceIDListener callback, LeaseRenewalManager leaseMgr ) throws IOException
import java.rmi.*; public interface RemoteBall extends Remote { public void hit() throws java.rmi.RemoteException; }
import java.rmi.*; import java.rmi.server.*; import net.jini.core.lookup.*; import com.sun.jini.lookup.*; public class Ball extends UnicastRemoteObject implements RemoteBall, ServiceIDListener { public Ball() throws RemoteException { super(); } public void serviceIDNotify(ServiceID id) { System.out.println("ServiceId is "+id); } public void hit() { System.out.println("Ball has been hit"); } }
import java.rmi.*; import net.jini.core.entry.*; import net.jini.lookup.entry.*; import com.sun.jini.lookup.*; import com.sun.jini.lease.*; public class BallStarter { public static void main(String[] args) { try { System.setSecurityManager(new RMISecurityManager()); RemoteBall ball = (RemoteBall) new Ball(); LeaseRenewalManager renewal = new LeaseRenewalManager(); Entry[] attributes = new Entry [] { new Name("Jini enabled ball")}; JoinManager join = new JoinManager( ball, attributes, (Ball) ball, renewal ); System.out.println("Ball started and registered at Lookup-Server"); } catch (Exception e) { e.printStackTrace(); } } }
図3 Lookup
サービスは、次の3つで区別される。public ServiceTemplate(ServiceID serviceID, Class[] serviceTypes, Entry[] attrSetTemplates)サービス(オブジェクト)の探し方
import java.rmi.*; import net.jini.core.discovery.*; import net.jini.core.lookup.*; public class Bat { public Ball ball; public void play(RemoteBall ball) { try { ball.hit(); System.out.println("I hit the ball"); } catch (RemoteException e) { System.out.println(e); } } public static void main (String[] args) { Bat bat = new Bat(); try { System.setSecurityManager(new RMISecurityManager()); LookupLocator locator = new LookupLocator("jini://localhost"); ServiceRegistrar registrar = locator.getRegistrar(); Class[] classes = new Class[] { RemoteBall.class }; ServiceTemplate template = new ServiceTemplate( null, classes, null); RemoteBall remoteBall = (RemoteBall) registrar.lookup(template); bat.play(remoteBall); } catch (Exception e) { e.printStackTrace(); } } }
信頼性が低いネットワークと、どう戦う方法の1つ。
lease には、期限がある。
期限の長さは、交渉可能。 期限が短い(1分以下、数秒)というのは、あまり想定されていない。
リースの利点
Jini のサービスを提供しているオブジェクトは、Lease インタフェースを実 装している。
public interface Lease { long FOREVER = Long.MAX_VALUE; ... long getExpiration(); void renew(long duration) throws LeaseDeniedException, UnknownLeaseException, RemoteException; void cancel() throws UnknownLeaseException, RemoteException; ... }getExpiration() で、リースの残り時間がわかる。ミリ秒単位。
renew() で延長する。 延長できない時には、LeaseDeniedException が返される。
もう使わなくなった時には、cancel() できる。 期限切れと同じことになる。
リースの管理には、LeaseRenewalManager を使う。
Compare CSP and Concurrent Paccal, and describe an essential similarity and an essential difference.
How does the Actor model express concurrency? Explain it briefly.