SunRPCの内部構造

 並行分散ソフトウェア/並列分散ソフトウェア

                                       電子・情報工学系
                                       新城 靖
                                       <yas@is.tsukuba.ac.jp>

このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/sie/pdsoft-2005/2006-01-27
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/sie/
http://www.cs.tsukuba.ac.jp/~yas/

連絡

後半のグループ通信の部分は、1月13日のものと同じです。

SunRPC のソースコード: ftp://playground.sun.com/pub/rpc/rpcsrc_40.tar.Z

復習

SunRPCプログラミング

SunRPCのメッセージ

rpc_msg.h を書直したもの。一部正確ではない部分が含まれるかもしれない。
struct opaque_auth {
	enum_t	oa_flavor;		/* flavor of auth */
	opaque  oa_data<> ;
};

union rejected_reply
switch(enum reject_stat rj_stat)
{
case RPC_MISMATCH:
    RJ_versions_t RJ_versions;
case AUTH_ERROR:
    enum auth_stat RJ_why;
};

union accepted_reply_body
switch(enum accept_stat ar_stat)
{
case SUCCESS:
    /* ヘッダの終り。 */
    /* 応答メッセージが続く。 */
case PROG_UNAVAIL:
    void;
case PROG_MISMATCH:
    var_vers ar_vers;
case PROC_UNAVAIL:
    void;
case GARBAGE_ARGS:
    void;
case SYSTEM_ERR:
    void;
};

struct accepted_reply {
	opaque_auth		ar_verf;
	accepted_reply_body	ar_body;
};

union reply_body
switch(enum reply_stat rp_stat)
{
case MSG_ACCEPTED:
	struct accepted_reply RP_ar;
case MSG_DENIED:
	struct rejected_reply RP_dr;
};

struct call_body {
	u_long cb_rpcvers;	/* must be equal to two */
	u_long cb_prog;
	u_long cb_vers;
	u_long cb_proc;
	struct opaque_auth cb_cred;
	struct opaque_auth cb_verf; /* protocol specific - provided by client */
	/* ヘッダの終り。*/
	/* 要求メッセージ本体が続く */
};

union rpc_msg_body
switch(msg_type rm_direction){
case CALL:
	struct call_body RM_cmb;
case REPLY:
	struct reply_body RM_rmb;
};

struct rpc_msg {
	u_long	rm_xid;
    	rpc_msg_body rm_body;
};

SunRPCのクライアント側のコード

CLIENT構造体

クライアント側で使う構造体。
CLIENT *
clnt_create(host, prog, vers, proto)
char *host;
u_long prog, vers;
char *proto;

       Generic client creation routine.   host  identifies
       the  name  of  the  remote host where the server is
       located.  proto indicates which kind  of  transport
       protocol to use. The currently supported values for
       this field are "udp" and "tcp".   Default  timeouts
       are  set, but can be modified using clnt_control().

       Warning: Using UDP  has  its  shortcomings.   Since
       UDP-based RPC messages can only hold up to 8 Kbytes
       of encoded data, this transport cannot be used  for
       procedures that take large arguments or return huge
       results.
clnt_generic.c
/*
 * Generic client creation: takes (hostname, program-number, protocol) and
 * returns client handle. Default options are set, which the user can 
 * change using the rpc equivalent of ioctl()'s.
 */
CLIENT *
clnt_create(hostname, prog, vers, proto)
        char *hostname;
        unsigned prog;
        unsigned vers;
        char *proto;
{
        struct sockaddr_in sin;
...
        h = gethostbyname(hostname);
...
        bcopy(h->h_addr, (char*)&sin.sin_addr, h->h_length);
...
        p = getprotobyname(proto);
        switch (p->p_proto) {
        case IPPROTO_UDP:
...
                client = clntudp_create(&sin, prog, vers, tv, &sock);
...
                break;
        case IPPROTO_TCP:
                client = clnttcp_create(&sin, prog, vers, &sock, 0, 0);
...
                break;
        return (client);
}

CLIENT 構造体は、Java, C++ のアブストラクトクラス相当。 内部に関数のテーブルを持っている。

/*
 * Client rpc handle.
 * Created by individual implementations, see e.g. rpc_udp.c.
 * Client is responsible for initializing auth, see e.g. auth_none.c.
 */
typedef struct {
        AUTH    *cl_auth;                       /* authenticator */
        struct clnt_ops {
                enum clnt_stat  (*cl_call)();   /* call remote procedure */
                void            (*cl_abort)();  /* abort a call */
                void            (*cl_geterr)(); /* get specific error code */
                bool_t          (*cl_freeres)(); /* frees results */
                void            (*cl_destroy)();/* destroy this structure */
                bool_t          (*cl_control)();/* the ioctl() of rpc */
        } *cl_ops;
        caddr_t                 cl_private;     /* private stuff */
} CLIENT;

#define clnt_call(rh, proc, xargs, argsp, xres, resp, secs)     \
        ((*(rh)->cl_ops->cl_call)(rh, proc, xargs, argsp, xres, resp, secs))
#define clnt_abort(rh)  ((*(rh)->cl_ops->cl_abort)(rh))
#define clnt_geterr(rh,errp)    ((*(rh)->cl_ops->cl_geterr)(rh, errp))
#define clnt_freeres(rh,xres,resp) ((*(rh)->cl_ops->cl_freeres)(rh,xres,resp))
#define clnt_control(cl,rq,in) ((*(cl)->cl_ops->cl_control)(cl,rq,in))
#define clnt_destroy(rh)        ((*(rh)->cl_ops->cl_destroy)(rh))

関数のポインタと固有のデータ構造を含む。

図? SunRPC のクライアント側で使う構造体

CLIENT構造体とオブジェクト指向

抽象クラスClientをクラスClientTCPとClientUDPが継承している。

図? CLIENT 構造体と抽象クラス

C++ 的な表記
class client
{
public:
    struct AUTH *cl_auth;
    virtual enum clnt_stat
    cl_call(u_long proc, xdrproc_t xargs,
	    caddr_t argsp, xdrproc_t xres,
	    caddr_t resp,struct timeval timeout);
    virtual void clnt_abort(void);
    virtual void clnt_geterr(struct rpc_err *errp);
    virtual bool_t clnt_freeres(xdrproc_t xres, caddr_t resp);
    virtual bool_t clnt_control(u_int request, char *info);
};

class client_tcp : client {
	int		ct_sock;
	bool_t		ct_closeit;
	struct timeval	ct_bool;
	wait_t          ct_waitset;       /* wait set by clnt_control? */
	struct sockaddr_in ct_addr; 
	struct rpc_err	ct_error;
	char		ct_mcall[MCALL_MSG_SIZE];	/* marshalled callmsg */
	u_int		ct_mpos;			/* pos after marshal */
	XDR		ct_xdrs;
};
Java風の表記
public abstract class Client
{
    AUTH cl_auth;
    public abstract clnt_stat 
        cl_call(long proc, xdrproc_t xargs,
		caddr_t argsp, xdrproc_t xres,
		caddr_t resp, timeval timeout);
    public abstract void clnt_abort();
    public abstract rpc_err clnt_geterr();
    public abstract boolean clnt_freeres(xdrproc_t xres, caddr_t resp);
    public abstract boolean clnt_control(int request, Object info);
};

public class ClientTCP extends Client
{
    static final int MCALL_MSG_SIZE = 24;
    int	        ct_sock;
    boolean	ct_closeit;
    timeval	ct_bool;
    wait_t      ct_waitset; /* wait set by clnt_control? */
    sockaddr_in ct_addr; 
    rpc_err	ct_error;
    byte	ct_mcall[] = new byte[MCALL_MSG_SIZE];
				/* marshalled callmsg */
    int		ct_mpos;	/* pos after marshal */
    XDR		ct_xdrs;

    public clnt_stat 
    cl_call(long proc, xdrproc_t xargs,
		caddr_t argsp, xdrproc_t xres,
		caddr_t resp, timeval timeout)
    { 実体あり }
    public void clnt_abort()
    { 実体あり }
    public rpc_err clnt_geterr()
    { 実体あり }
    public boolean clnt_freeres(xdrproc_t xres, caddr_t resp)
    { 実体あり }
    public boolean clnt_control(int request, Object info)
    { 実体あり }
};

CLIENT構造体の初期化(TCP)

clnt_tcp.c
static int	readtcp();
static int	writetcp();

static enum clnt_stat   clnttcp_call();
static void             clnttcp_abort();
static void             clnttcp_geterr();
static bool_t           clnttcp_freeres();
static bool_t           clnttcp_control();
static void             clnttcp_destroy();

static struct clnt_ops tcp_ops = {
        clnttcp_call,
        clnttcp_abort,
        clnttcp_geterr,
        clnttcp_freeres,
        clnttcp_destroy,
        clnttcp_control
};

struct ct_data {
        int             ct_sock;
        bool_t          ct_closeit;
        struct timeval  ct_wait;
        bool_t          ct_waitset;       /* wait set by clnt_control? */
        struct sockaddr_in ct_addr; 
        struct rpc_err  ct_error;
        char            ct_mcall[MCALL_MSG_SIZE];       /* marshalled callmsg */
        u_int           ct_mpos;                        /* pos after marshal */
        XDR             ct_xdrs;
};

/*
 * Create a client handle for a tcp/ip connection.
 * If *sockp<0, *sockp is set to a newly created TCP socket and it is
 * connected to raddr.  If *sockp non-negative then
 * raddr is ignored.  The rpc/tcp package does buffering
 * similar to stdio, so the client must pick send and receive buffer sizes,];
 * 0 => use the default.
 * If raddr->sin_port is 0, then a binder on the remote machine is
 * consulted for the right port number.
 * NB: *sockp is copied into a private area.
 * NB: It is the clients responsibility to close *sockp.
 * NB: The rpch->cl_auth is set null authentication.  Caller may wish to set this
 * something more useful.
 */
CLIENT *
clnttcp_create(raddr, prog, vers, sockp, sendsz, recvsz)
	struct sockaddr_in *raddr;
	u_long prog;
	u_long vers;
	register int *sockp;
	u_int sendsz;
	u_int recvsz;
{
	CLIENT *h;
	register struct ct_data *ct;
	struct timeval now;
	struct rpc_msg call_msg;

	h  = (CLIENT *)mem_alloc(sizeof(*h));
...
	ct = (struct ct_data *)mem_alloc(sizeof(*ct));
...
	if (raddr->sin_port == 0) {
		u_short port;
		if ((port = pmap_getport(raddr, prog, vers, IPPROTO_TCP)) == 0) {
			mem_free((caddr_t)ct, sizeof(struct ct_data));
			mem_free((caddr_t)h, sizeof(CLIENT));
			return ((CLIENT *)NULL);
		}
		raddr->sin_port = htons(port);
	}
	if (*sockp < 0) {
		*sockp = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
		(void)bindresvport(*sockp, (struct sockaddr_in *)0);
...
		ct->ct_closeit = TRUE;
	} else {
		ct->ct_closeit = FALSE;
	}

	/*
	 * Set up private data struct
	 */
	ct->ct_sock = *sockp;
	ct->ct_wait.tv_usec = 0;
	ct->ct_waitset = FALSE;
	ct->ct_addr = *raddr;

	/*
	 * Initialize call message
	 */
	(void)gettimeofday(&now, (struct timezone *)0);
	call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
	call_msg.rm_direction = CALL;
	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
	call_msg.rm_call.cb_prog = prog;
	call_msg.rm_call.cb_vers = vers;

	/*
	 * pre-serialize the staic part of the call msg and stash it away
	 */
	xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE,
	    XDR_ENCODE);
	if (! xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
		if (ct->ct_closeit) {
			(void)close(*sockp);
		}
		goto fooy;
	}

	ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
	XDR_DESTROY(&(ct->ct_xdrs));
	xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz,
	    (caddr_t)ct, readtcp, writetcp);
	h->cl_ops = &tcp_ops;
	h->cl_private = (caddr_t) ct;
	h->cl_auth = authnone_create();
	return (h);

fooy:
	/*
	 * Something goofed, free stuff and barf
	 */
	mem_free((caddr_t)ct, sizeof(struct ct_data));
	mem_free((caddr_t)h, sizeof(CLIENT));
	return ((CLIENT *)NULL);
}

スタブの例(dirlist_1())

rpcgen により自動生成されたもの。
   1:	/*
   2:	 * Please do not edit this file.
   3:	 * It was generated using rpcgen.
   4:	 */
   5:	
   6:	#include <memory.h> /* for memset */
   7:	#include "dirlist.h"
   8:	
   9:	/* Default timeout can be changed using clnt_control() */
  10:	static struct timeval TIMEOUT = { 25, 0 };
  11:	
  12:	dirlist_res *
  13:	dirlist_1(char **argp, CLIENT *clnt)
  14:	{
  15:	        static dirlist_res clnt_res;
  16:	
  17:	        memset((char *)&clnt_res, 0, sizeof(clnt_res));
  18:	        if (clnt_call (clnt, DIRLIST,
  19:	                (xdrproc_t) xdr_wrapstring, (caddr_t) argp,
  20:	                (xdrproc_t) xdr_dirlist_res, (caddr_t) &clnt_res,
  21:	                TIMEOUT) != RPC_SUCCESS) {
  22:	                return (NULL);
  23:	        }
  24:	        return (&clnt_res);
  25:	}
enum clnt_stat
clnt_call(clnt, procnum, inproc, in, outproc, out, tout)
CLIENT *clnt;
u_long procnum;
xdrproc_t inproc, outproc;
char *in, *out;
struct timeval tout;

       A  macro  that  calls  the remote procedure procnum
       associated with the client handle, clnt,  which  is
       obtained  with  an RPC client creation routine such
       as clnt_create().  The parameter in is the  address
       of  the  procedure's  argument(s),  and  out is the
       address of where to place the result(s); inproc  is
       used to encode the procedure's parameters, and out-
       proc is used to  decode  the  procedure's  results;
       tout  is the time allowed for results to come back.
マクロにより、TCP の場合、clnt_tcp.c の clnttcp_call() に行き着く。
static enum clnt_stat
clnttcp_call(h, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout)
	register CLIENT *h;
	u_long proc;
	xdrproc_t xdr_args;
	caddr_t args_ptr;
	xdrproc_t xdr_results;
	caddr_t results_ptr;
	struct timeval timeout;
{
	register struct ct_data *ct = (struct ct_data *) h->cl_private;
	register XDR *xdrs = &(ct->ct_xdrs);
	struct rpc_msg reply_msg;
	u_long x_id;
	u_long *msg_x_id = (u_long *)(ct->ct_mcall);	/* yuk */
	register bool_t shipnow;
	int refreshes = 2;

	if (!ct->ct_waitset) {
		ct->ct_wait = timeout;
	}

	shipnow =
	    (xdr_results == (xdrproc_t)0 && timeout.tv_sec == 0
	    && timeout.tv_usec == 0) ? FALSE : TRUE;

call_again:
	xdrs->x_op = XDR_ENCODE;
	ct->ct_error.re_status = RPC_SUCCESS;
	x_id = ntohl(--(*msg_x_id));
	if ((! XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
	    (! XDR_PUTLONG(xdrs, (long *)&proc)) ||
	    (! AUTH_MARSHALL(h->cl_auth, xdrs)) ||
	    (! (*xdr_args)(xdrs, args_ptr))) {
		if (ct->ct_error.re_status == RPC_SUCCESS)
			ct->ct_error.re_status = RPC_CANTENCODEARGS;
		(void)xdrrec_endofrecord(xdrs, TRUE);
		return (ct->ct_error.re_status);
	}
	if (! xdrrec_endofrecord(xdrs, shipnow))
		return (ct->ct_error.re_status = RPC_CANTSEND);
	if (! shipnow)
		return (RPC_SUCCESS);
	/*
	 * Hack to provide rpc-based message passing
	 */
	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
		return(ct->ct_error.re_status = RPC_TIMEDOUT);
	}


	/*
	 * Keep receiving until we get a valid transaction id
	 */
	xdrs->x_op = XDR_DECODE;
	while (TRUE) {
		reply_msg.acpted_rply.ar_verf = _null_auth;
		reply_msg.acpted_rply.ar_results.where = NULL;
		reply_msg.acpted_rply.ar_results.proc = xdr_void;
		if (! xdrrec_skiprecord(xdrs))
			return (ct->ct_error.re_status);
		/* now decode and validate the response header */
		if (! xdr_replymsg(xdrs, &reply_msg)) {
			if (ct->ct_error.re_status == RPC_SUCCESS)
				continue;
			return (ct->ct_error.re_status);
		}
		if (reply_msg.rm_xid == x_id)
			break;
	}

	/*
	 * process header
	 */
	_seterr_reply(&reply_msg, &(ct->ct_error));
	if (ct->ct_error.re_status == RPC_SUCCESS) {
		if (! AUTH_VALIDATE(h->cl_auth, &reply_msg.acpted_rply.ar_verf)) {
			ct->ct_error.re_status = RPC_AUTHERROR;
			ct->ct_error.re_why = AUTH_INVALIDRESP;
		} else if (! (*xdr_results)(xdrs, results_ptr)) {
			if (ct->ct_error.re_status == RPC_SUCCESS)
				ct->ct_error.re_status = RPC_CANTDECODERES;
		}
		/* free verifier ... */
		if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
			xdrs->x_op = XDR_FREE;
			(void)xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
		}
	}  /* end successful completion */
	else {
		/* maybe our credentials need to be refreshed ... */
		if (refreshes-- && AUTH_REFRESH(h->cl_auth))
			goto call_again;
	}  /* end of unsuccessful completion */
	return (ct->ct_error.re_status);
}

XDR

SunRPC で、整列化、非整列化(メモリの確保)、メモリの解放を行う。

rpcgenにより生成された手続きの例

   1:	/*
   2:	 * Please do not edit this file.
   3:	 * It was generated using rpcgen.
   4:	 */
   5:	
   6:	#include "dirlist.h"
   7:	
   8:	bool_t
   9:	xdr_delist (XDR *xdrs, delist *objp)
  10:	{
  11:	        register int32_t *buf;
  12:	
  13:	         if (!xdr_string (xdrs, &objp->del_name, ~0))
  14:	                 return FALSE;
  15:	         if (!xdr_pointer (xdrs, (char **)&objp->del_next, sizeof (delist), (xdrproc_t) xdr_delist))
  16:	                 return FALSE;
  17:	        return TRUE;
  18:	}
  19:	
  20:	bool_t
  21:	xdr_dirlist_res (XDR *xdrs, dirlist_res *objp)
  22:	{
  23:	        register int32_t *buf;
  24:	
  25:	         if (!xdr_int (xdrs, &objp->dlr_errno))
  26:	                 return FALSE;
  27:	        switch (objp->dlr_errno) {
  28:	        case 0:
  29:	                 if (!xdr_pointer (xdrs, (char **)&objp->dirlist_res_u.dlr_head, sizeof (delist), (xdrproc_t) xdr_delist))
  30:	                         return FALSE;
  31:	                break;
  32:	        default:
  33:	                break;
  34:	        }
  35:	        return TRUE;
  36:	}
データ構造とアルゴリズムが一致している。

xdr_int()とxdr_pointer()


xdr_int(xdrs, ip)
XDR *xdrs;
int *ip;
       A  filter primitive that translates between C inte-
       gers and their external representations.  This rou-
       tine returns one if it succeeds, zero otherwise.

xdr_pointer(xdrs, objpp, objsize, xdrobj)
XDR *xdrs;
char **objpp;
u_int objsize;
xdrproc_t xdrobj;

       Like xdr_reference() execpt that it serializes NULL
       pointers, whereas xdr_reference() does not.   Thus,
       xdr_pointer()  can  represent recursive data struc-
       tures, such as binary trees or linked lists.

xdr_reference(xdrs, pp, size, proc)
XDR *xdrs;
char **pp;
u_int size;
xdrproc_t proc;

       A primitive that provides  pointer  chasing  within
       structures.  The parameter pp is the address of the
       pointer; size is the sizeof the structure that  *pp
       points  to;  and proc is an XDR procedure that fil-
       ters the structure  between  its  C  form  and  its
       external  representation.  This routine returns one
       if it succeeds, zero otherwise.

       Warning: this  routine  does  not  understand  NULL
       pointers. Use xdr_pointer() instead.

xdr.c
/*
 * XDR integers
 */
bool_t
xdr_int(xdrs, ip)
        XDR *xdrs;
        int *ip;
{
        if (sizeof (int) == sizeof (long)) {
                return (xdr_long(xdrs, (long *)ip));
        } else {
                return (xdr_short(xdrs, (short *)ip));
        }
}

/*
 * XDR long integers
 * same as xdr_u_long - open coded to save a proc call!
 */
bool_t
xdr_long(xdrs, lp)
        register XDR *xdrs;
        long *lp;
{

        if (xdrs->x_op == XDR_ENCODE)
                return (XDR_PUTLONG(xdrs, lp));

        if (xdrs->x_op == XDR_DECODE)
                return (XDR_GETLONG(xdrs, lp));

        if (xdrs->x_op == XDR_FREE)
                return (TRUE);

        return (FALSE);
}

XDR構造体

抽象クラス。関数のポインタを含む。

xdr.h

enum xdr_op {
        XDR_ENCODE=0,
        XDR_DECODE=1,
        XDR_FREE=2
};
typedef bool_t (*xdrproc_t)();

/*
 * The XDR handle.
 * Contains operation which is being applied to the stream,
 * an operations vector for the paticular implementation (e.g. see xdr_mem.c),
 * and two private fields for the use of the particular impelementation.
 */
typedef struct {
        enum xdr_op     x_op;           /* operation; fast additional param */
        struct xdr_ops {
                bool_t  (*x_getlong)(); /* get a long from underlying stream */
                bool_t  (*x_putlong)(); /* put a long to " */
                bool_t  (*x_getbytes)();/* get some bytes from " */
                bool_t  (*x_putbytes)();/* put some bytes to " */
                u_int   (*x_getpostn)();/* returns bytes off from beginning */
                bool_t  (*x_setpostn)();/* lets you reposition the stream */
                long *  (*x_inline)();  /* buf quick ptr to buffered data */
                void    (*x_destroy)(); /* free privates of this xdr_stream */
        } *x_ops;
        caddr_t         x_public;       /* users' data */
        caddr_t         x_private;      /* pointer to private data */
        caddr_t         x_base;         /* private used for position info */
        int             x_handy;        /* extra private word */
} XDR;

#define XDR_GETLONG(xdrs, longp)                        \
        (*(xdrs)->x_ops->x_getlong)(xdrs, longp)
#define XDR_PUTLONG(xdrs, longp)                        \
        (*(xdrs)->x_ops->x_putlong)(xdrs, longp)

#define XDR_GETBYTES(xdrs, addr, len)                   \
        (*(xdrs)->x_ops->x_getbytes)(xdrs, addr, len)
#define XDR_PUTBYTES(xdrs, addr, len)                   \
        (*(xdrs)->x_ops->x_putbytes)(xdrs, addr, len)

#define IXDR_GET_LONG(buf)              ((long)ntohl((u_long)*(buf)++))
#define IXDR_PUT_LONG(buf, v)           (*(buf)++ = (long)htonl((u_long)v))

TCP用のXDRのサブクラス

xdr_rec.c
static u_int    fix_buf_size();

static bool_t   xdrrec_getlong();
static bool_t   xdrrec_putlong();
static bool_t   xdrrec_getbytes();
static bool_t   xdrrec_putbytes();
static u_int    xdrrec_getpos();
static bool_t   xdrrec_setpos();
static long *   xdrrec_inline();
static void     xdrrec_destroy();

static struct  xdr_ops xdrrec_ops = {
        xdrrec_getlong,
        xdrrec_putlong,
        xdrrec_getbytes,
        xdrrec_putbytes,
        xdrrec_getpos,
        xdrrec_setpos,
        xdrrec_inline,
        xdrrec_destroy
};


void
xdrrec_create(xdrs, sendsize, recvsize, tcp_handle, readit, writeit)
        register XDR *xdrs;
        register u_int sendsize;
        register u_int recvsize;
        caddr_t tcp_handle;
        int (*readit)();  /* like read, but pass it a tcp_handle, not sock */
        int (*writeit)();  /* like write, but pass it a tcp_handle, not sock */
{
        register RECSTREAM *rstrm =
                (RECSTREAM *)mem_alloc(sizeof(RECSTREAM));
...
        xdrs->x_ops = &xdrrec_ops;
        xdrs->x_private = (caddr_t)rstrm;
        rstrm->tcp_handle = tcp_handle;
        rstrm->writeit = writeit;
}

/*
 * The reoutines defined below are the xdr ops which will go into the
 * xdr handle filled in by xdrrec_create.
 */

static bool_t
xdrrec_getlong(xdrs, lp)
        XDR *xdrs;
        long *lp;
{
        register RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private);
        register long *buflp = (long *)(rstrm->in_finger);
        long mylong;

        /* first try the inline, fast case */
        if ((rstrm->fbtbc >= sizeof(long)) &&
                (((int)rstrm->in_boundry - (int)buflp) >= sizeof(long))) {
                *lp = (long)ntohl((u_long)(*buflp));
                rstrm->fbtbc -= sizeof(long);
                rstrm->in_finger += sizeof(long);
        } else {
                if (! xdrrec_getbytes(xdrs, (caddr_t)&mylong, sizeof(long)))
                        return (FALSE);
                *lp = (long)ntohl((u_long)mylong);
        }
        return (TRUE);
}

static bool_t
xdrrec_putlong(xdrs, lp)
        XDR *xdrs;
        long *lp;
{
        register RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private);
        register long *dest_lp = ((long *)(rstrm->out_finger));

        if ((rstrm->out_finger += sizeof(long)) > rstrm->out_boundry) {
                /*
                 * this case should almost never happen so the code is
                 * inefficient
                 */
                rstrm->out_finger -= sizeof(long);
                rstrm->frag_sent = TRUE;
                if (! flush_out(rstrm, FALSE))
                        return (FALSE);
                dest_lp = ((long *)(rstrm->out_finger));
                rstrm->out_finger += sizeof(long);
        }
        *dest_lp = (long)htonl((u_long)(*lp));
        return (TRUE);
}
...

ポインタ

xdr_reference.c
#define LASTUNSIGNED	((u_int)0-1)
bool_t
xdr_reference(xdrs, pp, size, proc)
	register XDR *xdrs;
	caddr_t *pp;		/* the pointer to work on */
	u_int size;		/* size of the object pointed to */
	xdrproc_t proc;		/* xdr routine to handle the object */
{
	register caddr_t loc = *pp;
	register bool_t stat;

	if (loc == NULL)
		switch (xdrs->x_op) {
		case XDR_FREE:
			return (TRUE);

		case XDR_DECODE:
			*pp = loc = (caddr_t) mem_alloc(size);
			if (loc == NULL) {
				(void) fprintf(stderr,
				    "xdr_reference: out of memory\n");
				return (FALSE);
			}
			bzero(loc, (int)size);
			break;
	}

	stat = (*proc)(xdrs, loc, LASTUNSIGNED);

	if (xdrs->x_op == XDR_FREE) {
		mem_free(loc, size);
		*pp = NULL;
	}
	return (stat);
}

bool_t
xdr_pointer(xdrs,objpp,obj_size,xdr_obj)
	register XDR *xdrs;
	char **objpp;
	u_int obj_size;
	xdrproc_t xdr_obj;
{

	bool_t more_data;

	more_data = (*objpp != NULL);
	if (! xdr_bool(xdrs,&more_data)) {
		return (FALSE);
	}
	if (! more_data) {
		*objpp = NULL;
		return (TRUE);
	}
	return (xdr_reference(xdrs,objpp,obj_size,xdr_obj));
}

文字列

xdr.c
/*
 * XDR null terminated ASCII strings
 * xdr_string deals with "C strings" - arrays of bytes that are
 * terminated by a NULL character.  The parameter cpp references a
 * pointer to storage; If the pointer is null, then the necessary
 * storage is allocated.  The last parameter is the max allowed length
 * of the string as specified by a protocol.
 */
bool_t
xdr_string(xdrs, cpp, maxsize)
	register XDR *xdrs;
	char **cpp;
	u_int maxsize;
{
	register char *sp = *cpp;  /* sp is the actual string pointer */
	u_int size;
	u_int nodesize;

	/*
	 * first deal with the length since xdr strings are counted-strings
	 */
	switch (xdrs->x_op) {
	case XDR_FREE:
		if (sp == NULL) {
			return(TRUE);	/* already free */
		}
		/* fall through... */
	case XDR_ENCODE:
		size = strlen(sp);
		break;
	}
	if (! xdr_u_int(xdrs, &size)) {
		return (FALSE);
	}
	if (size > maxsize) {
		return (FALSE);
	}
	nodesize = size + 1;

	/*
	 * now deal with the actual bytes
	 */
	switch (xdrs->x_op) {

	case XDR_DECODE:
		if (nodesize == 0) {
			return (TRUE);
		}
		if (sp == NULL)
			*cpp = sp = (char *)mem_alloc(nodesize);
		if (sp == NULL) {
			(void) fprintf(stderr, "xdr_string: out of memory\n");
			return (FALSE);
		}
		sp[size] = 0;
		/* fall into ... */

	case XDR_ENCODE:
		return (xdr_opaque(xdrs, sp, size));

	case XDR_FREE:
		mem_free(sp, nodesize);
		*cpp = NULL;
		return (TRUE);
	}
	return (FALSE);
}

サーバ側スタブ

SVCXPRT構造体

クライアント側の CLIENT 構造体に対向するもの。
/*
 * Server side transport handle
 */
typedef struct {
	int		xp_sock;
	u_short		xp_port;	 /* associated port number */
	struct xp_ops {
	    bool_t	(*xp_recv)();	 /* receive incomming requests */
	    enum xprt_stat (*xp_stat)(); /* get transport status */
	    bool_t	(*xp_getargs)(); /* get arguments */
	    bool_t	(*xp_reply)();	 /* send reply */
	    bool_t	(*xp_freeargs)();/* free mem allocated for args */
	    void	(*xp_destroy)(); /* destroy this struct */
	} *xp_ops;
	int		xp_addrlen;	 /* length of remote address */
	struct sockaddr_in xp_raddr;	 /* remote address */
	struct opaque_auth xp_verf;	 /* raw response verifier */
	caddr_t		xp_p1;		 /* private */
	caddr_t		xp_p2;		 /* private */
} SVCXPRT;

#define svc_getargs(xprt, xargs, argsp)			\
	(*(xprt)->xp_ops->xp_getargs)((xprt), (xargs), (argsp))

サーバ側スタブの例

   1:	/*
   2:	 * Please do not edit this file.
   3:	 * It was generated using rpcgen.
   4:	 */
   5:	
   6:	#include "dirlist.h"
   7:	#include <stdio.h>
   8:	#include <stdlib.h>
   9:	#include <rpc/pmap_clnt.h>
  10:	#include <string.h>
  11:	#include <memory.h>
  12:	#include <sys/socket.h>
  13:	#include <netinet/in.h>
  14:	
  15:	#ifndef SIG_PF
  16:	#define SIG_PF void(*)(int)
  17:	#endif
  18:	
  19:	static void
  20:	dirlist_prog_1(struct svc_req *rqstp, register SVCXPRT *transp)
  21:	{
  22:	        union {
  23:	                char *dirlist_1_arg;
  24:	        } argument;
  25:	        char *result;
  26:	        xdrproc_t _xdr_argument, _xdr_result;
  27:	        char *(*local)(char *, struct svc_req *);
  28:	
  29:	        switch (rqstp->rq_proc) {
  30:	        case NULLPROC:
  31:	                (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL);
  32:	                return;
  33:	
  34:	        case DIRLIST:
  35:	                _xdr_argument = (xdrproc_t) xdr_wrapstring;
  36:	                _xdr_result = (xdrproc_t) xdr_dirlist_res;
  37:	                local = (char *(*)(char *, struct svc_req *)) dirlist_1_svc;
  38:	                break;
  39:	
  40:	        default:
  41:	                svcerr_noproc (transp);
  42:	                return;
  43:	        }
  44:	        memset ((char *)&argument, 0, sizeof (argument));
  45:	        if (!svc_getargs (transp, _xdr_argument, (caddr_t) &argument)) {
  46:	                svcerr_decode (transp);
  47:	                return;
  48:	        }
  49:	        result = (*local)((char *)&argument, rqstp);
  50:	        if (result != NULL && !svc_sendreply(transp, _xdr_result, result)) {
  51:	                svcerr_systemerr (transp);
  52:	        }
  53:	        if (!svc_freeargs (transp, _xdr_argument, (caddr_t) &argument)) {
  54:	                fprintf (stderr, "%s", "unable to free arguments");
  55:	                exit (1);
  56:	        }
  57:	        return;
  58:	}
  59:	
  60:	int
  61:	main (int argc, char **argv)
  62:	{
  63:	        register SVCXPRT *transp;
  64:	
  65:	        pmap_unset (DIRLIST_PROG, DIRLIST_VERSION);
  66:	
  67:	        transp = svcudp_create(RPC_ANYSOCK);
  68:	        if (transp == NULL) {
  69:	                fprintf (stderr, "%s", "cannot create udp service.");
  70:	                exit(1);
  71:	        }
  72:	        if (!svc_register(transp, DIRLIST_PROG, DIRLIST_VERSION, dirlist_prog_1, IPPROTO_UDP)) {
  73:	                fprintf (stderr, "%s", "unable to register (DIRLIST_PROG, DIRLIST_VERSION, udp).");
  74:	                exit(1);
  75:	        }
  76:	
  77:	        transp = svctcp_create(RPC_ANYSOCK, 0, 0);
  78:	        if (transp == NULL) {
  79:	                fprintf (stderr, "%s", "cannot create tcp service.");
  80:	                exit(1);
  81:	        }
  82:	        if (!svc_register(transp, DIRLIST_PROG, DIRLIST_VERSION, dirlist_prog_1, IPPROTO_TCP)) {
  83:	                fprintf (stderr, "%s", "unable to register (DIRLIST_PROG, DIRLIST_VERSION, tcp).");
  84:	                exit(1);
  85:	        }
  86:	
  87:	        svc_run ();
  88:	        fprintf (stderr, "%s", "svc_run returned");
  89:	        exit (1);
  90:	        /* NOTREACHED */
  91:	}

サーバ側スタブのためのライブラリ関数

svc_register(xprt, prognum, versnum, dispatch, protocol)
SVCXPRT *xprt;
u_long prognum, versnum;
void (*dispatch) ();
u_long protocol;

       Associates  prognum  and  versnum  with the service
       dispatch procedure, dispatch.  If protocol is zero,
       the service is not registered with the portmap ser-
       vice.  If protocol is non-zero, then a  mapping  of
       the     triple     [prognum,versnum,protocol]    to
       xprt->xp_port is established with the local portmap
       service (generally protocol is zero, IPPROTO_UDP or
       IPPROTO_TCP ).  The procedure dispatch has the fol-
       lowing form:
	  dispatch(request, xprt)
	  struct svc_req *request;
	  SVCXPRT *xprt;

       The  svc_register()  routine returns one if it suc-
       ceeds, and zero otherwise.

svc_run()

       This  routine  never  returns.  It  waits  for  RPC
       requests  to arrive, and calls the appropriate ser-
       vice procedure using svc_getreq() when one arrives.
       This  procedure  is  usually waiting for a select()
       system call to return.

svc_sendreply(xprt, outproc, out)
SVCXPRT *xprt;
xdrproc_t outproc;
char *out;

       Called by an RPC service's dispatch routine to send
       the results of a remote procedure call.  The param-
       eter xprt is  the  request's  associated  transport
       handle; outproc is the XDR routine which is used to
       encode the results; and out is the address  of  the
       results.   This routine returns one if it succeeds,
       zero otherwise.

svc_run()

svc_run.c
void
svc_run()
{
	fd_set readfds;
	extern int errno;

	for (;;) {
		readfds = svc_fdset;
		switch (select(_rpc_dtablesize(), &readfds, (int *)0, (int *)0,
			       (struct timeval *)0)) {
		case -1:
			if (errno == EINTR) {
				continue;
			}
			perror("svc_run: - select failed");
			return;
		case 0:
			continue;
		default:
			svc_getreqset(&readfds);
		}
	}
}
全部のプログラムに対して select() で要求を待つ。

svc_getreqset()

void
svc_getreqset(readfds)
	fd_set *readfds;
{
    enum xprt_stat stat;
    struct rpc_msg msg;
    int prog_found;
    u_long low_vers;
    u_long high_vers;
    struct svc_req r;
    register SVCXPRT *xprt;
    register u_long mask;
    register int bit;
    register u_long *maskp;
    register int setsize;
    register int sock;
    char cred_area[2*MAX_AUTH_BYTES + RQCRED_SIZE];
    msg.rm_call.cb_cred.oa_base = cred_area;
    msg.rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]);
    r.rq_clntcred = &(cred_area[2*MAX_AUTH_BYTES]);


    setsize = _rpc_dtablesize();	
    maskp = (u_long *)readfds->fds_bits;
    for (sock = 0; sock < setsize; sock += NFDBITS) {
	for (mask = *maskp++; bit = ffs(mask); mask ^= (1 << (bit - 1))) {
	    /* sock has input waiting */
	    xprt = xports[sock + bit - 1];
	    /* now receive msgs from xprtprt (support batch calls) */
	    do {
		if (SVC_RECV(xprt, &msg)) {

		    /* now find the exported program and call it */
		    register struct svc_callout *s;
		    enum auth_stat why;

		    r.rq_xprt = xprt;
		    r.rq_prog = msg.rm_call.cb_prog;
		    r.rq_vers = msg.rm_call.cb_vers;
		    r.rq_proc = msg.rm_call.cb_proc;
		    r.rq_cred = msg.rm_call.cb_cred;
		    /* first authenticate the message */
		    if ((why= _authenticate(&r, &msg)) != AUTH_OK) {
			    svcerr_auth(xprt, why);
			    goto call_done;
		    }
		    /* now match message with a registered service*/
		    prog_found = FALSE;
		    low_vers = 0 - 1;
		    high_vers = 0;
		    for (s = svc_head; s != NULL_SVC; s = s->sc_next) {
			if (s->sc_prog == r.rq_prog) {
			    if (s->sc_vers == r.rq_vers) {
				(*s->sc_dispatch)(&r, xprt);
				    goto call_done;
			    }  /* found correct version */
			    prog_found = TRUE;
			    if (s->sc_vers < low_vers)
				low_vers = s->sc_vers;
			    if (s->sc_vers > high_vers)
				high_vers = s->sc_vers;
			}   /* found correct program */
		    }
		    /*
		     * if we got here, the program or version
		     * is not served ...
		     */
		    if (prog_found)
			svcerr_progvers(xprt,
			low_vers, high_vers);
		    else
			svcerr_noprog(xprt);
		    /* Fall through to ... */
		}
call_done:
		if ((stat = SVC_STAT(xprt)) == XPRT_DIED){
		    SVC_DESTROY(xprt);
		    break;
		}
	    } while (stat == XPRT_MOREREQS);
	}
    }
}

■グループ通信

グループ通信で何が難しいか

◆設計の論点

並列処理では、クローズが多い。 IP MBone のように、オープンでないとどうしようもないものもある。

対等だと、クラッシュに強い(single point of failure がない)が、何をす るにもすぐ投票が必要になる。

◆メンバシップの管理

グループサーバを持つか持たないか。

難しいのは、メンバのプロセスがクラッシュした時。 何も言わずにグループから抜ける。

◆アドレス指定

◆通信プリミティブ

one-to-oneの通信と同じにしたい。しかし、 障害がなければ、簡単。

◆atomic broadcastの簡単なアルゴリズム

◆メッセージの到着順

one-to-one でも問題があるのに、group 通信だとさらに複雑。

total ordering は、グループ通信では実現が難しすぎる。弱いものが実現さ れる。

プロセスが複数のグループに属していると、1つのメッセージについての順序 だけ考えていてはすまなくなる。

◆スケーラビリテイ

数が増えた時に動くか。

◆冗長性

ネットワークのダウンに対応するには、ネットワークを冗長に しないといけない。冗長性があると、メッセージの重複を うまく扱う必要がでてくる。

下手をすると、メッセージが増幅しながいつまでもぐるぐる回る。

◆例 ISIS システム

分散アプリケーションのためのツールキット。 コーネル大学。

◆ISISの同期性(synchrony)

同期システム(synchronous system)
イベント(マルチキャスト)の順序が厳密に逐次的に発生する。オーバーラップしない。 イベント(マルチキャストを含む)は、完了までの時間は、0。 実現不可能。
緩やかな同期システム(loosly synchronous system)
イベントは有限時間で届く。
仮想的な同期システム(virtually synchronous system)
因果関係が成り立つようにがんばる。(並行なものは、手抜き。)

◆ISISの通信プリミティブ

ABCAST
緩やかな同期
CBCAST
仮想的な同期
GBCAST
仮想的な同期(グループのメンバシップ用)

最初の実現:2相コミットによる ABCAST 。重すぎる。

  1. 送信者は、タイムスタンプを含むメッセージを全てのメンバに送る。
  2. 各メンバは、送信、または、受信したメッセージで最大のものを最初の送信者に送る。
  3. 送信者は、全ての返事を受け取ると、最大のものを選び、メンバにコミット・メッセージを送る。コミット・メッセージは、タイムスタンプの順に届けられる。
CBCASTの実現

メンバの数:n

各プロセスは、グループごとに、長さnのベクトルVを持つ。 i番目の要素は、プロセスiから正しい順序で受信したメッセージの最後の番号。 0に初期化される。

  1. プロセスは、送信すべきメッセージがあると、 ベクトルの自分のスロットを増加させ、メッセージの一部として送信する。
  2. メッセージのベクトルをV、メモリ中のベクトルをLとする。 メッセージがjから送られてきたものとすると、次の条件の時に受け取る。 そうでないものは、この条件が満たされるまでバッファリングされる。

M1が届くまでM2の配送を遅延。

図? ISISでのCBCASTの実現。

◆ニュースシステム

マルチキャストの実装。
↑[もどる] ←[1月20日] ・[1月27日] →[2月3日]
Last updated: 2006/01/28 20:37:46
Yasushi Shinjo / <yas@is.tsukuba.ac.jp>