並行分散ソフトウェア/並列分散ソフトウェア
電子・情報工学系
新城 靖
<yas@is.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.cs.tsukuba.ac.jp/~yas/sie/pdsoft-2005/2006-01-27
あるいは、次のページから手繰っていくこともできます。
http://www.cs.tsukuba.ac.jp/~yas/sie/
http://www.cs.tsukuba.ac.jp/~yas/
SunRPC のソースコード: ftp://playground.sun.com/pub/rpc/rpcsrc_40.tar.Z
struct opaque_auth {
enum_t oa_flavor; /* flavor of auth */
opaque oa_data<> ;
};
union rejected_reply
switch(enum reject_stat rj_stat)
{
case RPC_MISMATCH:
RJ_versions_t RJ_versions;
case AUTH_ERROR:
enum auth_stat RJ_why;
};
union accepted_reply_body
switch(enum accept_stat ar_stat)
{
case SUCCESS:
/* ヘッダの終り。 */
/* 応答メッセージが続く。 */
case PROG_UNAVAIL:
void;
case PROG_MISMATCH:
var_vers ar_vers;
case PROC_UNAVAIL:
void;
case GARBAGE_ARGS:
void;
case SYSTEM_ERR:
void;
};
struct accepted_reply {
opaque_auth ar_verf;
accepted_reply_body ar_body;
};
union reply_body
switch(enum reply_stat rp_stat)
{
case MSG_ACCEPTED:
struct accepted_reply RP_ar;
case MSG_DENIED:
struct rejected_reply RP_dr;
};
struct call_body {
u_long cb_rpcvers; /* must be equal to two */
u_long cb_prog;
u_long cb_vers;
u_long cb_proc;
struct opaque_auth cb_cred;
struct opaque_auth cb_verf; /* protocol specific - provided by client */
/* ヘッダの終り。*/
/* 要求メッセージ本体が続く */
};
union rpc_msg_body
switch(msg_type rm_direction){
case CALL:
struct call_body RM_cmb;
case REPLY:
struct reply_body RM_rmb;
};
struct rpc_msg {
u_long rm_xid;
rpc_msg_body rm_body;
};
CLIENT *
clnt_create(host, prog, vers, proto)
char *host;
u_long prog, vers;
char *proto;
Generic client creation routine. host identifies
the name of the remote host where the server is
located. proto indicates which kind of transport
protocol to use. The currently supported values for
this field are "udp" and "tcp". Default timeouts
are set, but can be modified using clnt_control().
Warning: Using UDP has its shortcomings. Since
UDP-based RPC messages can only hold up to 8 Kbytes
of encoded data, this transport cannot be used for
procedures that take large arguments or return huge
results.
clnt_generic.c
/*
* Generic client creation: takes (hostname, program-number, protocol) and
* returns client handle. Default options are set, which the user can
* change using the rpc equivalent of ioctl()'s.
*/
CLIENT *
clnt_create(hostname, prog, vers, proto)
char *hostname;
unsigned prog;
unsigned vers;
char *proto;
{
struct sockaddr_in sin;
...
h = gethostbyname(hostname);
...
bcopy(h->h_addr, (char*)&sin.sin_addr, h->h_length);
...
p = getprotobyname(proto);
switch (p->p_proto) {
case IPPROTO_UDP:
...
client = clntudp_create(&sin, prog, vers, tv, &sock);
...
break;
case IPPROTO_TCP:
client = clnttcp_create(&sin, prog, vers, &sock, 0, 0);
...
break;
return (client);
}
/*
* Client rpc handle.
* Created by individual implementations, see e.g. rpc_udp.c.
* Client is responsible for initializing auth, see e.g. auth_none.c.
*/
typedef struct {
AUTH *cl_auth; /* authenticator */
struct clnt_ops {
enum clnt_stat (*cl_call)(); /* call remote procedure */
void (*cl_abort)(); /* abort a call */
void (*cl_geterr)(); /* get specific error code */
bool_t (*cl_freeres)(); /* frees results */
void (*cl_destroy)();/* destroy this structure */
bool_t (*cl_control)();/* the ioctl() of rpc */
} *cl_ops;
caddr_t cl_private; /* private stuff */
} CLIENT;
#define clnt_call(rh, proc, xargs, argsp, xres, resp, secs) \
((*(rh)->cl_ops->cl_call)(rh, proc, xargs, argsp, xres, resp, secs))
#define clnt_abort(rh) ((*(rh)->cl_ops->cl_abort)(rh))
#define clnt_geterr(rh,errp) ((*(rh)->cl_ops->cl_geterr)(rh, errp))
#define clnt_freeres(rh,xres,resp) ((*(rh)->cl_ops->cl_freeres)(rh,xres,resp))
#define clnt_control(cl,rq,in) ((*(cl)->cl_ops->cl_control)(cl,rq,in))
#define clnt_destroy(rh) ((*(rh)->cl_ops->cl_destroy)(rh))

図? SunRPC のクライアント側で使う構造体

図? CLIENT 構造体と抽象クラス
C++ 的な表記
class client
{
public:
struct AUTH *cl_auth;
virtual enum clnt_stat
cl_call(u_long proc, xdrproc_t xargs,
caddr_t argsp, xdrproc_t xres,
caddr_t resp,struct timeval timeout);
virtual void clnt_abort(void);
virtual void clnt_geterr(struct rpc_err *errp);
virtual bool_t clnt_freeres(xdrproc_t xres, caddr_t resp);
virtual bool_t clnt_control(u_int request, char *info);
};
class client_tcp : client {
int ct_sock;
bool_t ct_closeit;
struct timeval ct_bool;
wait_t ct_waitset; /* wait set by clnt_control? */
struct sockaddr_in ct_addr;
struct rpc_err ct_error;
char ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */
u_int ct_mpos; /* pos after marshal */
XDR ct_xdrs;
};
Java風の表記
public abstract class Client
{
AUTH cl_auth;
public abstract clnt_stat
cl_call(long proc, xdrproc_t xargs,
caddr_t argsp, xdrproc_t xres,
caddr_t resp, timeval timeout);
public abstract void clnt_abort();
public abstract rpc_err clnt_geterr();
public abstract boolean clnt_freeres(xdrproc_t xres, caddr_t resp);
public abstract boolean clnt_control(int request, Object info);
};
public class ClientTCP extends Client
{
static final int MCALL_MSG_SIZE = 24;
int ct_sock;
boolean ct_closeit;
timeval ct_bool;
wait_t ct_waitset; /* wait set by clnt_control? */
sockaddr_in ct_addr;
rpc_err ct_error;
byte ct_mcall[] = new byte[MCALL_MSG_SIZE];
/* marshalled callmsg */
int ct_mpos; /* pos after marshal */
XDR ct_xdrs;
public clnt_stat
cl_call(long proc, xdrproc_t xargs,
caddr_t argsp, xdrproc_t xres,
caddr_t resp, timeval timeout)
{ 実体あり }
public void clnt_abort()
{ 実体あり }
public rpc_err clnt_geterr()
{ 実体あり }
public boolean clnt_freeres(xdrproc_t xres, caddr_t resp)
{ 実体あり }
public boolean clnt_control(int request, Object info)
{ 実体あり }
};
static int readtcp();
static int writetcp();
static enum clnt_stat clnttcp_call();
static void clnttcp_abort();
static void clnttcp_geterr();
static bool_t clnttcp_freeres();
static bool_t clnttcp_control();
static void clnttcp_destroy();
static struct clnt_ops tcp_ops = {
clnttcp_call,
clnttcp_abort,
clnttcp_geterr,
clnttcp_freeres,
clnttcp_destroy,
clnttcp_control
};
struct ct_data {
int ct_sock;
bool_t ct_closeit;
struct timeval ct_wait;
bool_t ct_waitset; /* wait set by clnt_control? */
struct sockaddr_in ct_addr;
struct rpc_err ct_error;
char ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */
u_int ct_mpos; /* pos after marshal */
XDR ct_xdrs;
};
/*
* Create a client handle for a tcp/ip connection.
* If *sockp<0, *sockp is set to a newly created TCP socket and it is
* connected to raddr. If *sockp non-negative then
* raddr is ignored. The rpc/tcp package does buffering
* similar to stdio, so the client must pick send and receive buffer sizes,];
* 0 => use the default.
* If raddr->sin_port is 0, then a binder on the remote machine is
* consulted for the right port number.
* NB: *sockp is copied into a private area.
* NB: It is the clients responsibility to close *sockp.
* NB: The rpch->cl_auth is set null authentication. Caller may wish to set this
* something more useful.
*/
CLIENT *
clnttcp_create(raddr, prog, vers, sockp, sendsz, recvsz)
struct sockaddr_in *raddr;
u_long prog;
u_long vers;
register int *sockp;
u_int sendsz;
u_int recvsz;
{
CLIENT *h;
register struct ct_data *ct;
struct timeval now;
struct rpc_msg call_msg;
h = (CLIENT *)mem_alloc(sizeof(*h));
...
ct = (struct ct_data *)mem_alloc(sizeof(*ct));
...
if (raddr->sin_port == 0) {
u_short port;
if ((port = pmap_getport(raddr, prog, vers, IPPROTO_TCP)) == 0) {
mem_free((caddr_t)ct, sizeof(struct ct_data));
mem_free((caddr_t)h, sizeof(CLIENT));
return ((CLIENT *)NULL);
}
raddr->sin_port = htons(port);
}
if (*sockp < 0) {
*sockp = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
(void)bindresvport(*sockp, (struct sockaddr_in *)0);
...
ct->ct_closeit = TRUE;
} else {
ct->ct_closeit = FALSE;
}
/*
* Set up private data struct
*/
ct->ct_sock = *sockp;
ct->ct_wait.tv_usec = 0;
ct->ct_waitset = FALSE;
ct->ct_addr = *raddr;
/*
* Initialize call message
*/
(void)gettimeofday(&now, (struct timezone *)0);
call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
call_msg.rm_direction = CALL;
call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
call_msg.rm_call.cb_prog = prog;
call_msg.rm_call.cb_vers = vers;
/*
* pre-serialize the staic part of the call msg and stash it away
*/
xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE,
XDR_ENCODE);
if (! xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
if (ct->ct_closeit) {
(void)close(*sockp);
}
goto fooy;
}
ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
XDR_DESTROY(&(ct->ct_xdrs));
xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz,
(caddr_t)ct, readtcp, writetcp);
h->cl_ops = &tcp_ops;
h->cl_private = (caddr_t) ct;
h->cl_auth = authnone_create();
return (h);
fooy:
/*
* Something goofed, free stuff and barf
*/
mem_free((caddr_t)ct, sizeof(struct ct_data));
mem_free((caddr_t)h, sizeof(CLIENT));
return ((CLIENT *)NULL);
}
1: /*
2: * Please do not edit this file.
3: * It was generated using rpcgen.
4: */
5:
6: #include <memory.h> /* for memset */
7: #include "dirlist.h"
8:
9: /* Default timeout can be changed using clnt_control() */
10: static struct timeval TIMEOUT = { 25, 0 };
11:
12: dirlist_res *
13: dirlist_1(char **argp, CLIENT *clnt)
14: {
15: static dirlist_res clnt_res;
16:
17: memset((char *)&clnt_res, 0, sizeof(clnt_res));
18: if (clnt_call (clnt, DIRLIST,
19: (xdrproc_t) xdr_wrapstring, (caddr_t) argp,
20: (xdrproc_t) xdr_dirlist_res, (caddr_t) &clnt_res,
21: TIMEOUT) != RPC_SUCCESS) {
22: return (NULL);
23: }
24: return (&clnt_res);
25: }
enum clnt_stat
clnt_call(clnt, procnum, inproc, in, outproc, out, tout)
CLIENT *clnt;
u_long procnum;
xdrproc_t inproc, outproc;
char *in, *out;
struct timeval tout;
A macro that calls the remote procedure procnum
associated with the client handle, clnt, which is
obtained with an RPC client creation routine such
as clnt_create(). The parameter in is the address
of the procedure's argument(s), and out is the
address of where to place the result(s); inproc is
used to encode the procedure's parameters, and out-
proc is used to decode the procedure's results;
tout is the time allowed for results to come back.
マクロにより、TCP の場合、clnt_tcp.c の clnttcp_call() に行き着く。
static enum clnt_stat
clnttcp_call(h, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout)
register CLIENT *h;
u_long proc;
xdrproc_t xdr_args;
caddr_t args_ptr;
xdrproc_t xdr_results;
caddr_t results_ptr;
struct timeval timeout;
{
register struct ct_data *ct = (struct ct_data *) h->cl_private;
register XDR *xdrs = &(ct->ct_xdrs);
struct rpc_msg reply_msg;
u_long x_id;
u_long *msg_x_id = (u_long *)(ct->ct_mcall); /* yuk */
register bool_t shipnow;
int refreshes = 2;
if (!ct->ct_waitset) {
ct->ct_wait = timeout;
}
shipnow =
(xdr_results == (xdrproc_t)0 && timeout.tv_sec == 0
&& timeout.tv_usec == 0) ? FALSE : TRUE;
call_again:
xdrs->x_op = XDR_ENCODE;
ct->ct_error.re_status = RPC_SUCCESS;
x_id = ntohl(--(*msg_x_id));
if ((! XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
(! XDR_PUTLONG(xdrs, (long *)&proc)) ||
(! AUTH_MARSHALL(h->cl_auth, xdrs)) ||
(! (*xdr_args)(xdrs, args_ptr))) {
if (ct->ct_error.re_status == RPC_SUCCESS)
ct->ct_error.re_status = RPC_CANTENCODEARGS;
(void)xdrrec_endofrecord(xdrs, TRUE);
return (ct->ct_error.re_status);
}
if (! xdrrec_endofrecord(xdrs, shipnow))
return (ct->ct_error.re_status = RPC_CANTSEND);
if (! shipnow)
return (RPC_SUCCESS);
/*
* Hack to provide rpc-based message passing
*/
if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
return(ct->ct_error.re_status = RPC_TIMEDOUT);
}
/*
* Keep receiving until we get a valid transaction id
*/
xdrs->x_op = XDR_DECODE;
while (TRUE) {
reply_msg.acpted_rply.ar_verf = _null_auth;
reply_msg.acpted_rply.ar_results.where = NULL;
reply_msg.acpted_rply.ar_results.proc = xdr_void;
if (! xdrrec_skiprecord(xdrs))
return (ct->ct_error.re_status);
/* now decode and validate the response header */
if (! xdr_replymsg(xdrs, &reply_msg)) {
if (ct->ct_error.re_status == RPC_SUCCESS)
continue;
return (ct->ct_error.re_status);
}
if (reply_msg.rm_xid == x_id)
break;
}
/*
* process header
*/
_seterr_reply(&reply_msg, &(ct->ct_error));
if (ct->ct_error.re_status == RPC_SUCCESS) {
if (! AUTH_VALIDATE(h->cl_auth, &reply_msg.acpted_rply.ar_verf)) {
ct->ct_error.re_status = RPC_AUTHERROR;
ct->ct_error.re_why = AUTH_INVALIDRESP;
} else if (! (*xdr_results)(xdrs, results_ptr)) {
if (ct->ct_error.re_status == RPC_SUCCESS)
ct->ct_error.re_status = RPC_CANTDECODERES;
}
/* free verifier ... */
if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
xdrs->x_op = XDR_FREE;
(void)xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
}
} /* end successful completion */
else {
/* maybe our credentials need to be refreshed ... */
if (refreshes-- && AUTH_REFRESH(h->cl_auth))
goto call_again;
} /* end of unsuccessful completion */
return (ct->ct_error.re_status);
}
1: /*
2: * Please do not edit this file.
3: * It was generated using rpcgen.
4: */
5:
6: #include "dirlist.h"
7:
8: bool_t
9: xdr_delist (XDR *xdrs, delist *objp)
10: {
11: register int32_t *buf;
12:
13: if (!xdr_string (xdrs, &objp->del_name, ~0))
14: return FALSE;
15: if (!xdr_pointer (xdrs, (char **)&objp->del_next, sizeof (delist), (xdrproc_t) xdr_delist))
16: return FALSE;
17: return TRUE;
18: }
19:
20: bool_t
21: xdr_dirlist_res (XDR *xdrs, dirlist_res *objp)
22: {
23: register int32_t *buf;
24:
25: if (!xdr_int (xdrs, &objp->dlr_errno))
26: return FALSE;
27: switch (objp->dlr_errno) {
28: case 0:
29: if (!xdr_pointer (xdrs, (char **)&objp->dirlist_res_u.dlr_head, sizeof (delist), (xdrproc_t) xdr_delist))
30: return FALSE;
31: break;
32: default:
33: break;
34: }
35: return TRUE;
36: }
データ構造とアルゴリズムが一致している。
xdr_型 が現れる。
xdr_int(xdrs, ip)
XDR *xdrs;
int *ip;
A filter primitive that translates between C inte-
gers and their external representations. This rou-
tine returns one if it succeeds, zero otherwise.
xdr_pointer(xdrs, objpp, objsize, xdrobj)
XDR *xdrs;
char **objpp;
u_int objsize;
xdrproc_t xdrobj;
Like xdr_reference() execpt that it serializes NULL
pointers, whereas xdr_reference() does not. Thus,
xdr_pointer() can represent recursive data struc-
tures, such as binary trees or linked lists.
xdr_reference(xdrs, pp, size, proc)
XDR *xdrs;
char **pp;
u_int size;
xdrproc_t proc;
A primitive that provides pointer chasing within
structures. The parameter pp is the address of the
pointer; size is the sizeof the structure that *pp
points to; and proc is an XDR procedure that fil-
ters the structure between its C form and its
external representation. This routine returns one
if it succeeds, zero otherwise.
Warning: this routine does not understand NULL
pointers. Use xdr_pointer() instead.
xdr.c
/*
* XDR integers
*/
bool_t
xdr_int(xdrs, ip)
XDR *xdrs;
int *ip;
{
if (sizeof (int) == sizeof (long)) {
return (xdr_long(xdrs, (long *)ip));
} else {
return (xdr_short(xdrs, (short *)ip));
}
}
/*
* XDR long integers
* same as xdr_u_long - open coded to save a proc call!
*/
bool_t
xdr_long(xdrs, lp)
register XDR *xdrs;
long *lp;
{
if (xdrs->x_op == XDR_ENCODE)
return (XDR_PUTLONG(xdrs, lp));
if (xdrs->x_op == XDR_DECODE)
return (XDR_GETLONG(xdrs, lp));
if (xdrs->x_op == XDR_FREE)
return (TRUE);
return (FALSE);
}
抽象クラス。関数のポインタを含む。
xdr.h
enum xdr_op {
XDR_ENCODE=0,
XDR_DECODE=1,
XDR_FREE=2
};
typedef bool_t (*xdrproc_t)();
/*
* The XDR handle.
* Contains operation which is being applied to the stream,
* an operations vector for the paticular implementation (e.g. see xdr_mem.c),
* and two private fields for the use of the particular impelementation.
*/
typedef struct {
enum xdr_op x_op; /* operation; fast additional param */
struct xdr_ops {
bool_t (*x_getlong)(); /* get a long from underlying stream */
bool_t (*x_putlong)(); /* put a long to " */
bool_t (*x_getbytes)();/* get some bytes from " */
bool_t (*x_putbytes)();/* put some bytes to " */
u_int (*x_getpostn)();/* returns bytes off from beginning */
bool_t (*x_setpostn)();/* lets you reposition the stream */
long * (*x_inline)(); /* buf quick ptr to buffered data */
void (*x_destroy)(); /* free privates of this xdr_stream */
} *x_ops;
caddr_t x_public; /* users' data */
caddr_t x_private; /* pointer to private data */
caddr_t x_base; /* private used for position info */
int x_handy; /* extra private word */
} XDR;
#define XDR_GETLONG(xdrs, longp) \
(*(xdrs)->x_ops->x_getlong)(xdrs, longp)
#define XDR_PUTLONG(xdrs, longp) \
(*(xdrs)->x_ops->x_putlong)(xdrs, longp)
#define XDR_GETBYTES(xdrs, addr, len) \
(*(xdrs)->x_ops->x_getbytes)(xdrs, addr, len)
#define XDR_PUTBYTES(xdrs, addr, len) \
(*(xdrs)->x_ops->x_putbytes)(xdrs, addr, len)
#define IXDR_GET_LONG(buf) ((long)ntohl((u_long)*(buf)++))
#define IXDR_PUT_LONG(buf, v) (*(buf)++ = (long)htonl((u_long)v))
static u_int fix_buf_size();
static bool_t xdrrec_getlong();
static bool_t xdrrec_putlong();
static bool_t xdrrec_getbytes();
static bool_t xdrrec_putbytes();
static u_int xdrrec_getpos();
static bool_t xdrrec_setpos();
static long * xdrrec_inline();
static void xdrrec_destroy();
static struct xdr_ops xdrrec_ops = {
xdrrec_getlong,
xdrrec_putlong,
xdrrec_getbytes,
xdrrec_putbytes,
xdrrec_getpos,
xdrrec_setpos,
xdrrec_inline,
xdrrec_destroy
};
void
xdrrec_create(xdrs, sendsize, recvsize, tcp_handle, readit, writeit)
register XDR *xdrs;
register u_int sendsize;
register u_int recvsize;
caddr_t tcp_handle;
int (*readit)(); /* like read, but pass it a tcp_handle, not sock */
int (*writeit)(); /* like write, but pass it a tcp_handle, not sock */
{
register RECSTREAM *rstrm =
(RECSTREAM *)mem_alloc(sizeof(RECSTREAM));
...
xdrs->x_ops = &xdrrec_ops;
xdrs->x_private = (caddr_t)rstrm;
rstrm->tcp_handle = tcp_handle;
rstrm->writeit = writeit;
}
/*
* The reoutines defined below are the xdr ops which will go into the
* xdr handle filled in by xdrrec_create.
*/
static bool_t
xdrrec_getlong(xdrs, lp)
XDR *xdrs;
long *lp;
{
register RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private);
register long *buflp = (long *)(rstrm->in_finger);
long mylong;
/* first try the inline, fast case */
if ((rstrm->fbtbc >= sizeof(long)) &&
(((int)rstrm->in_boundry - (int)buflp) >= sizeof(long))) {
*lp = (long)ntohl((u_long)(*buflp));
rstrm->fbtbc -= sizeof(long);
rstrm->in_finger += sizeof(long);
} else {
if (! xdrrec_getbytes(xdrs, (caddr_t)&mylong, sizeof(long)))
return (FALSE);
*lp = (long)ntohl((u_long)mylong);
}
return (TRUE);
}
static bool_t
xdrrec_putlong(xdrs, lp)
XDR *xdrs;
long *lp;
{
register RECSTREAM *rstrm = (RECSTREAM *)(xdrs->x_private);
register long *dest_lp = ((long *)(rstrm->out_finger));
if ((rstrm->out_finger += sizeof(long)) > rstrm->out_boundry) {
/*
* this case should almost never happen so the code is
* inefficient
*/
rstrm->out_finger -= sizeof(long);
rstrm->frag_sent = TRUE;
if (! flush_out(rstrm, FALSE))
return (FALSE);
dest_lp = ((long *)(rstrm->out_finger));
rstrm->out_finger += sizeof(long);
}
*dest_lp = (long)htonl((u_long)(*lp));
return (TRUE);
}
...
#define LASTUNSIGNED ((u_int)0-1)
bool_t
xdr_reference(xdrs, pp, size, proc)
register XDR *xdrs;
caddr_t *pp; /* the pointer to work on */
u_int size; /* size of the object pointed to */
xdrproc_t proc; /* xdr routine to handle the object */
{
register caddr_t loc = *pp;
register bool_t stat;
if (loc == NULL)
switch (xdrs->x_op) {
case XDR_FREE:
return (TRUE);
case XDR_DECODE:
*pp = loc = (caddr_t) mem_alloc(size);
if (loc == NULL) {
(void) fprintf(stderr,
"xdr_reference: out of memory\n");
return (FALSE);
}
bzero(loc, (int)size);
break;
}
stat = (*proc)(xdrs, loc, LASTUNSIGNED);
if (xdrs->x_op == XDR_FREE) {
mem_free(loc, size);
*pp = NULL;
}
return (stat);
}
bool_t
xdr_pointer(xdrs,objpp,obj_size,xdr_obj)
register XDR *xdrs;
char **objpp;
u_int obj_size;
xdrproc_t xdr_obj;
{
bool_t more_data;
more_data = (*objpp != NULL);
if (! xdr_bool(xdrs,&more_data)) {
return (FALSE);
}
if (! more_data) {
*objpp = NULL;
return (TRUE);
}
return (xdr_reference(xdrs,objpp,obj_size,xdr_obj));
}
/*
* XDR null terminated ASCII strings
* xdr_string deals with "C strings" - arrays of bytes that are
* terminated by a NULL character. The parameter cpp references a
* pointer to storage; If the pointer is null, then the necessary
* storage is allocated. The last parameter is the max allowed length
* of the string as specified by a protocol.
*/
bool_t
xdr_string(xdrs, cpp, maxsize)
register XDR *xdrs;
char **cpp;
u_int maxsize;
{
register char *sp = *cpp; /* sp is the actual string pointer */
u_int size;
u_int nodesize;
/*
* first deal with the length since xdr strings are counted-strings
*/
switch (xdrs->x_op) {
case XDR_FREE:
if (sp == NULL) {
return(TRUE); /* already free */
}
/* fall through... */
case XDR_ENCODE:
size = strlen(sp);
break;
}
if (! xdr_u_int(xdrs, &size)) {
return (FALSE);
}
if (size > maxsize) {
return (FALSE);
}
nodesize = size + 1;
/*
* now deal with the actual bytes
*/
switch (xdrs->x_op) {
case XDR_DECODE:
if (nodesize == 0) {
return (TRUE);
}
if (sp == NULL)
*cpp = sp = (char *)mem_alloc(nodesize);
if (sp == NULL) {
(void) fprintf(stderr, "xdr_string: out of memory\n");
return (FALSE);
}
sp[size] = 0;
/* fall into ... */
case XDR_ENCODE:
return (xdr_opaque(xdrs, sp, size));
case XDR_FREE:
mem_free(sp, nodesize);
*cpp = NULL;
return (TRUE);
}
return (FALSE);
}
/*
* Server side transport handle
*/
typedef struct {
int xp_sock;
u_short xp_port; /* associated port number */
struct xp_ops {
bool_t (*xp_recv)(); /* receive incomming requests */
enum xprt_stat (*xp_stat)(); /* get transport status */
bool_t (*xp_getargs)(); /* get arguments */
bool_t (*xp_reply)(); /* send reply */
bool_t (*xp_freeargs)();/* free mem allocated for args */
void (*xp_destroy)(); /* destroy this struct */
} *xp_ops;
int xp_addrlen; /* length of remote address */
struct sockaddr_in xp_raddr; /* remote address */
struct opaque_auth xp_verf; /* raw response verifier */
caddr_t xp_p1; /* private */
caddr_t xp_p2; /* private */
} SVCXPRT;
#define svc_getargs(xprt, xargs, argsp) \
(*(xprt)->xp_ops->xp_getargs)((xprt), (xargs), (argsp))
1: /*
2: * Please do not edit this file.
3: * It was generated using rpcgen.
4: */
5:
6: #include "dirlist.h"
7: #include <stdio.h>
8: #include <stdlib.h>
9: #include <rpc/pmap_clnt.h>
10: #include <string.h>
11: #include <memory.h>
12: #include <sys/socket.h>
13: #include <netinet/in.h>
14:
15: #ifndef SIG_PF
16: #define SIG_PF void(*)(int)
17: #endif
18:
19: static void
20: dirlist_prog_1(struct svc_req *rqstp, register SVCXPRT *transp)
21: {
22: union {
23: char *dirlist_1_arg;
24: } argument;
25: char *result;
26: xdrproc_t _xdr_argument, _xdr_result;
27: char *(*local)(char *, struct svc_req *);
28:
29: switch (rqstp->rq_proc) {
30: case NULLPROC:
31: (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL);
32: return;
33:
34: case DIRLIST:
35: _xdr_argument = (xdrproc_t) xdr_wrapstring;
36: _xdr_result = (xdrproc_t) xdr_dirlist_res;
37: local = (char *(*)(char *, struct svc_req *)) dirlist_1_svc;
38: break;
39:
40: default:
41: svcerr_noproc (transp);
42: return;
43: }
44: memset ((char *)&argument, 0, sizeof (argument));
45: if (!svc_getargs (transp, _xdr_argument, (caddr_t) &argument)) {
46: svcerr_decode (transp);
47: return;
48: }
49: result = (*local)((char *)&argument, rqstp);
50: if (result != NULL && !svc_sendreply(transp, _xdr_result, result)) {
51: svcerr_systemerr (transp);
52: }
53: if (!svc_freeargs (transp, _xdr_argument, (caddr_t) &argument)) {
54: fprintf (stderr, "%s", "unable to free arguments");
55: exit (1);
56: }
57: return;
58: }
59:
60: int
61: main (int argc, char **argv)
62: {
63: register SVCXPRT *transp;
64:
65: pmap_unset (DIRLIST_PROG, DIRLIST_VERSION);
66:
67: transp = svcudp_create(RPC_ANYSOCK);
68: if (transp == NULL) {
69: fprintf (stderr, "%s", "cannot create udp service.");
70: exit(1);
71: }
72: if (!svc_register(transp, DIRLIST_PROG, DIRLIST_VERSION, dirlist_prog_1, IPPROTO_UDP)) {
73: fprintf (stderr, "%s", "unable to register (DIRLIST_PROG, DIRLIST_VERSION, udp).");
74: exit(1);
75: }
76:
77: transp = svctcp_create(RPC_ANYSOCK, 0, 0);
78: if (transp == NULL) {
79: fprintf (stderr, "%s", "cannot create tcp service.");
80: exit(1);
81: }
82: if (!svc_register(transp, DIRLIST_PROG, DIRLIST_VERSION, dirlist_prog_1, IPPROTO_TCP)) {
83: fprintf (stderr, "%s", "unable to register (DIRLIST_PROG, DIRLIST_VERSION, tcp).");
84: exit(1);
85: }
86:
87: svc_run ();
88: fprintf (stderr, "%s", "svc_run returned");
89: exit (1);
90: /* NOTREACHED */
91: }
名前_prog_1() が
生成される。
svc_register(xprt, prognum, versnum, dispatch, protocol)
SVCXPRT *xprt;
u_long prognum, versnum;
void (*dispatch) ();
u_long protocol;
Associates prognum and versnum with the service
dispatch procedure, dispatch. If protocol is zero,
the service is not registered with the portmap ser-
vice. If protocol is non-zero, then a mapping of
the triple [prognum,versnum,protocol] to
xprt->xp_port is established with the local portmap
service (generally protocol is zero, IPPROTO_UDP or
IPPROTO_TCP ). The procedure dispatch has the fol-
lowing form:
dispatch(request, xprt)
struct svc_req *request;
SVCXPRT *xprt;
The svc_register() routine returns one if it suc-
ceeds, and zero otherwise.
svc_run()
This routine never returns. It waits for RPC
requests to arrive, and calls the appropriate ser-
vice procedure using svc_getreq() when one arrives.
This procedure is usually waiting for a select()
system call to return.
svc_sendreply(xprt, outproc, out)
SVCXPRT *xprt;
xdrproc_t outproc;
char *out;
Called by an RPC service's dispatch routine to send
the results of a remote procedure call. The param-
eter xprt is the request's associated transport
handle; outproc is the XDR routine which is used to
encode the results; and out is the address of the
results. This routine returns one if it succeeds,
zero otherwise.
void
svc_run()
{
fd_set readfds;
extern int errno;
for (;;) {
readfds = svc_fdset;
switch (select(_rpc_dtablesize(), &readfds, (int *)0, (int *)0,
(struct timeval *)0)) {
case -1:
if (errno == EINTR) {
continue;
}
perror("svc_run: - select failed");
return;
case 0:
continue;
default:
svc_getreqset(&readfds);
}
}
}
全部のプログラムに対して select() で要求を待つ。
void
svc_getreqset(readfds)
fd_set *readfds;
{
enum xprt_stat stat;
struct rpc_msg msg;
int prog_found;
u_long low_vers;
u_long high_vers;
struct svc_req r;
register SVCXPRT *xprt;
register u_long mask;
register int bit;
register u_long *maskp;
register int setsize;
register int sock;
char cred_area[2*MAX_AUTH_BYTES + RQCRED_SIZE];
msg.rm_call.cb_cred.oa_base = cred_area;
msg.rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]);
r.rq_clntcred = &(cred_area[2*MAX_AUTH_BYTES]);
setsize = _rpc_dtablesize();
maskp = (u_long *)readfds->fds_bits;
for (sock = 0; sock < setsize; sock += NFDBITS) {
for (mask = *maskp++; bit = ffs(mask); mask ^= (1 << (bit - 1))) {
/* sock has input waiting */
xprt = xports[sock + bit - 1];
/* now receive msgs from xprtprt (support batch calls) */
do {
if (SVC_RECV(xprt, &msg)) {
/* now find the exported program and call it */
register struct svc_callout *s;
enum auth_stat why;
r.rq_xprt = xprt;
r.rq_prog = msg.rm_call.cb_prog;
r.rq_vers = msg.rm_call.cb_vers;
r.rq_proc = msg.rm_call.cb_proc;
r.rq_cred = msg.rm_call.cb_cred;
/* first authenticate the message */
if ((why= _authenticate(&r, &msg)) != AUTH_OK) {
svcerr_auth(xprt, why);
goto call_done;
}
/* now match message with a registered service*/
prog_found = FALSE;
low_vers = 0 - 1;
high_vers = 0;
for (s = svc_head; s != NULL_SVC; s = s->sc_next) {
if (s->sc_prog == r.rq_prog) {
if (s->sc_vers == r.rq_vers) {
(*s->sc_dispatch)(&r, xprt);
goto call_done;
} /* found correct version */
prog_found = TRUE;
if (s->sc_vers < low_vers)
low_vers = s->sc_vers;
if (s->sc_vers > high_vers)
high_vers = s->sc_vers;
} /* found correct program */
}
/*
* if we got here, the program or version
* is not served ...
*/
if (prog_found)
svcerr_progvers(xprt,
low_vers, high_vers);
else
svcerr_noprog(xprt);
/* Fall through to ... */
}
call_done:
if ((stat = SVC_STAT(xprt)) == XPRT_DIED){
SVC_DESTROY(xprt);
break;
}
} while (stat == XPRT_MOREREQS);
}
}
}
グループ通信で何が難しいか
対等だと、クラッシュに強い(single point of failure がない)が、何をす るにもすぐ投票が必要になる。
グループサーバを持つか持たないか。
難しいのは、メンバのプロセスがクラッシュした時。 何も言わずにグループから抜ける。
total ordering は、グループ通信では実現が難しすぎる。弱いものが実現さ れる。
プロセスが複数のグループに属していると、1つのメッセージについての順序 だけ考えていてはすまなくなる。
下手をすると、メッセージが増幅しながいつまでもぐるぐる回る。
最初の実現:2相コミットによる ABCAST 。重すぎる。
メンバの数:n
各プロセスは、グループごとに、長さnのベクトルVを持つ。 i番目の要素は、プロセスiから正しい順序で受信したメッセージの最後の番号。 0に初期化される。

図? ISISでのCBCASTの実現。