歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Linux編程之從零開始搭建RPC分布式系統

Linux編程之從零開始搭建RPC分布式系統

日期:2017/3/1 9:05:40   编辑:Linux編程
我一畢業進公司就接觸到了RPC,主要是使用前輩們搭建好的RPC框架以及封裝好的RPC函數進行業務開發,雖說使用RPC框架開發已經近半年了,但一直想知道如何從零開始搭建起這麼一個好用的分布式通信系統框架,近日心血來潮,雖說沒人教怎麼搭建,但自己在網上查閱了大量資料後,開始自己一手一腳從零搭建這麼一個RPC框架,所以就有了以下這篇文章,以記錄我的搭建過程。 首先對RPC做一個簡要介紹。 RPC的全稱是Remote Procedure Call,它能夠在本地以函數調用的形式來實現網絡操作,讓程序員集中關注於業務邏輯,不用關心底層的數據通信。 網絡通信的應用程序大多數是使用顯式網絡編程(explicit network programming)的方式編寫的,比如我們所熟悉的socket編程。客戶端調用socket、connect、read和write,服務器則調用socket、bind、listen等函數。我們熟悉的大多數應用程序(Web浏覽器、Web服務器、Telnet客戶、Telnet服務器等程序)就是以這種方式編寫的。 編寫分布式應用程序的另一種方法就是使用隱式網絡編程(implicit network programming)。遠程過程調用(RPC)提供了這麼一個工具。使用隱式網絡編程的好處就是,程序員不需要把把大量精力放在網絡通信程序的編寫上,因為這一塊工作已經有RPC框架幫你實現了,所以程序員可以把更多精力放在業務邏輯的開發上去。 這裡就不對RPC做進一步詳細的理論性解析,這篇文章主要講述RPC的實踐,我們將一步一步搭建起一個基於RPC的完整的分布式通信系統框架。本文分為兩個部分,第一部分講述如何利用rpcgen工具搭建起來RPC通用骨架,第二部分我們就使用該骨架進行進一步完善,增加相應的處理函數,把血肉補充完全,做一個簡單的分布式計算系統demo。

參考資料:

  1. 《UNIX網絡編程.卷2:進程間通信(第2版)》[PDF] 下載見 http://www.linuxidc.com/Linux/2013-01/77936.htm
  2. 《Linux C高級程序員指南》 PDF下載見 http://www.linuxidc.com/Linux/2017-02/140414.htm
  3. 《rpcgen_mannual》

rpcgen_mannual PDF可以到Linux公社資源站下載:

------------------------------------------分割線------------------------------------------

免費下載地址在 http://linux.linuxidc.com/

用戶名與密碼都是www.linuxidc.com

具體下載目錄在 /2017年資料/2月/9日/Linux編程之從零開始搭建RPC分布式系統/

下載方法見 http://www.linuxidc.com/Linux/2013-07/87684.htm

------------------------------------------分割線------------------------------------------

cccccc

一、使用rpcgen工具生成RPC底層骨架 1.生成my.x文件,然後在該文件編寫以下程序 首先創建文件夾rpc(mkdir rpc),以後的所有文件都放在這個文件夾下。 創建my.x文件(my為文件名,.x為後綴):vi my.x 在my.x填入下面代碼:
#define MY_RPC_PROG_NUM         0x38000010   //程序號

struct my_io_data_s        //定義消息結構
{
    int mtype;
    int len;
    char data[1024];
};

typedef struct my_io_data_s my_io_data_t;

program MY_RPC_PROG { 

    version MY_RPC_VERS1 {
        int MY_RPCC(my_io_data_t) = 1;    /* 過程號 = 1 */
    } = 1;        /* Version number = 1 */

    version MY_RPC_VERS2 {
        my_io_data_t MY_RPCC(my_io_data_t) = 1;    /* 過稱號 = 1 */
    } = 2;        /* Version number = 2 */

} = MY_RPC_PROG_NUM;    /* Program number */
這裡我創建了兩個版本,version1和version2,版本的數量是可以自己定制的,如果你需要一個的話定義一個即可。因為我打算定義一個版本用於SET的操作,一個用於GET操作,所以定義了兩個版本。 上面使用了RPC語言,我對以上幾個特殊名詞做一下解釋。 每個RPC過程由程序號、版本號和過程號來唯一確定。 RPC版本號:程序號標志一組相關的遠程過程,程序號的是有范圍的,我們需要在范圍內填寫程序號。 程序號范圍 簡述 0x00000000 - 0x1FFFFFFF 由Sun公司定義,提供特定服務 0x20000000 - 0x3FFFFFFF 由程序員自己定義,提供本地服務或用於調試 0x40000000 - 0x5FFFFFFF 用於短時間使用的程序,例如回調程序 0x60000000 - 0xFFFFFFFF 保留程序號 這裡我們使用的范圍當然是0x20000000 - 0x3FFFFFFF,我填的是0x38000010。 版本號:在version的大括號裡我們定義兩個我們將要使用的RPC調用函數的類型,比如: version 1:我們定義了int MY_RPCC(my_io_data_t),這表明我們以後PRC框架使用的RPC調用函數的函數類型那個將會是:int * my_rpcc_1(my_io_data_t *argp, CLIENT *clnt) version 2:my_io_data_t MY_RPCC(my_io_data_t) 則會變成 my_io_data_t * my_rpcc_2(my_io_data_t *argp, CLIENT *clnt) 所以我們可以根據我們需要的類型模仿編寫即可。 2.使用rpcgen指令生成以下幾個文件 使用rpcgen my.x生成系列文件(可以加參數-C,表示使用ANSI C) 執行該指令後,文件夾下將出現以下幾個文件。 my.h:
/*
 * Please do not edit this file.
 * It was generated using rpcgen.
 */

#ifndef _MY_H_RPCGEN
#define _MY_H_RPCGEN

#include <rpc/rpc.h>


#ifdef __cplusplus
extern "C" {
#endif


struct my_io_data_s {
    int mtype;
    int len;
    char data[1024];
};
typedef struct my_io_data_s my_io_data_s;

typedef my_io_data_s my_io_data_t;

#define MY_RPC_PROG 666
#define MY_RPC_VERS1 1

#if defined(__STDC__) || defined(__cplusplus)
#define MY_RPCC 1
extern  int * my_rpcc_1(my_io_data_t *, CLIENT *);
extern  int * my_rpcc_1_svc(my_io_data_t *, struct svc_req *);
extern int my_rpc_prog_1_freeresult (SVCXPRT *, xdrproc_t, caddr_t);

#else /* K&R C */
#define MY_RPCC 1
extern  int * my_rpcc_1();
extern  int * my_rpcc_1_svc();
extern int my_rpc_prog_1_freeresult ();
#endif /* K&R C */
#define MY_RPC_VERS2 2

#if defined(__STDC__) || defined(__cplusplus)
extern  my_io_data_t * my_rpcc_2(my_io_data_t *, CLIENT *);
extern  my_io_data_t * my_rpcc_2_svc(my_io_data_t *, struct svc_req *);
extern int my_rpc_prog_2_freeresult (SVCXPRT *, xdrproc_t, caddr_t);

#else /* K&R C */
extern  my_io_data_t * my_rpcc_2();
extern  my_io_data_t * my_rpcc_2_svc();
extern int my_rpc_prog_2_freeresult ();
#endif /* K&R C */

/* the xdr functions */

#if defined(__STDC__) || defined(__cplusplus)
extern  bool_t xdr_my_io_data_s (XDR *, my_io_data_s*);
extern  bool_t xdr_my_io_data_t (XDR *, my_io_data_t*);

#else /* K&R C */
extern bool_t xdr_my_io_data_s ();
extern bool_t xdr_my_io_data_t ();

#endif /* K&R C */

#ifdef __cplusplus
}
#endif

#endif /* !_MY_H_RPCGEN */
View Code

my_clnt.c:

/*
 * Please do not edit this file.
 * It was generated using rpcgen.
 */

#include <memory.h> /* for memset */
#include "my.h"

/* Default timeout can be changed using clnt_control() */
static struct timeval TIMEOUT = { 25, 0 };

int *
my_rpcc_1(my_io_data_t *argp, CLIENT *clnt)
{
    static int clnt_res;

    memset((char *)&clnt_res, 0, sizeof(clnt_res));
    if (clnt_call (clnt, MY_RPCC,
        (xdrproc_t) xdr_my_io_data_t, (caddr_t) argp,
        (xdrproc_t) xdr_int, (caddr_t) &clnt_res,
        TIMEOUT) != RPC_SUCCESS) {
        return (NULL);
    }
    return (&clnt_res);
}

my_io_data_t *
my_rpcc_2(my_io_data_t *argp, CLIENT *clnt)
{
    static my_io_data_t clnt_res;

    memset((char *)&clnt_res, 0, sizeof(clnt_res));
    if (clnt_call (clnt, MY_RPCC,
        (xdrproc_t) xdr_my_io_data_t, (caddr_t) argp,
        (xdrproc_t) xdr_my_io_data_t, (caddr_t) &clnt_res,
        TIMEOUT) != RPC_SUCCESS) {
        return (NULL);
    }
    return (&clnt_res);
}
View Code

my_svc.c

/*
 * Please do not edit this file.
 * It was generated using rpcgen.
 */

#include "my.h"
#include <stdio.h>
#include <stdlib.h>
#include <rpc/pmap_clnt.h>
#include <string.h>
#include <memory.h>
#include <sys/socket.h>
#include <netinet/in.h>

#ifndef SIG_PF
#define SIG_PF void(*)(int)
#endif

static void
my_rpc_prog_1(struct svc_req *rqstp, register SVCXPRT *transp)
{
    union {
        my_io_data_t my_rpcc_1_arg;
    } argument;
    char *result;
    xdrproc_t _xdr_argument, _xdr_result;
    char *(*local)(char *, struct svc_req *);

    switch (rqstp->rq_proc) {
    case NULLPROC:
        (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL);
        return;

    case MY_RPCC:
        _xdr_argument = (xdrproc_t) xdr_my_io_data_t;
        _xdr_result = (xdrproc_t) xdr_int;
        local = (char *(*)(char *, struct svc_req *)) my_rpcc_1_svc;
        break;

    default:
        svcerr_noproc (transp);
        return;
    }
    memset ((char *)&argument, 0, sizeof (argument));
    if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
        svcerr_decode (transp);
        return;
    }
    result = (*local)((char *)&argument, rqstp);
    if (result != NULL && !svc_sendreply(transp, (xdrproc_t) _xdr_result, result)) {
        svcerr_systemerr (transp);
    }
    if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
        fprintf (stderr, "%s", "unable to free arguments");
        exit (1);
    }
    return;
}

static void
my_rpc_prog_2(struct svc_req *rqstp, register SVCXPRT *transp)
{
    union {
        my_io_data_t my_rpcc_2_arg;
    } argument;
    char *result;
    xdrproc_t _xdr_argument, _xdr_result;
    char *(*local)(char *, struct svc_req *);

    switch (rqstp->rq_proc) {
    case NULLPROC:
        (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL);
        return;

    case MY_RPCC:
        _xdr_argument = (xdrproc_t) xdr_my_io_data_t;
        _xdr_result = (xdrproc_t) xdr_my_io_data_t;
        local = (char *(*)(char *, struct svc_req *)) my_rpcc_2_svc;
        break;

    default:
        svcerr_noproc (transp);
        return;
    }
    memset ((char *)&argument, 0, sizeof (argument));
    if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
        svcerr_decode (transp);
        return;
    }
    result = (*local)((char *)&argument, rqstp);
    if (result != NULL && !svc_sendreply(transp, (xdrproc_t) _xdr_result, result)) {
        svcerr_systemerr (transp);
    }
    if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
        fprintf (stderr, "%s", "unable to free arguments");
        exit (1);
    }
    return;
}

int
main (int argc, char **argv)
{
    register SVCXPRT *transp;

    pmap_unset (MY_RPC_PROG, MY_RPC_VERS1);
    pmap_unset (MY_RPC_PROG, MY_RPC_VERS2);

    transp = svcudp_create(RPC_ANYSOCK);
    if (transp == NULL) {
        fprintf (stderr, "%s", "cannot create udp service.");
        exit(1);
    }
    if (!svc_register(transp, MY_RPC_PROG, MY_RPC_VERS1, my_rpc_prog_1, IPPROTO_UDP)) {
        fprintf (stderr, "%s", "unable to register (MY_RPC_PROG, MY_RPC_VERS1, udp).");
        exit(1);
    }
    if (!svc_register(transp, MY_RPC_PROG, MY_RPC_VERS2, my_rpc_prog_2, IPPROTO_UDP)) {
        fprintf (stderr, "%s", "unable to register (MY_RPC_PROG, MY_RPC_VERS2, udp).");
        exit(1);
    }

    transp = svctcp_create(RPC_ANYSOCK, 0, 0);
    if (transp == NULL) {
        fprintf (stderr, "%s", "cannot create tcp service.");
        exit(1);
    }
    if (!svc_register(transp, MY_RPC_PROG, MY_RPC_VERS1, my_rpc_prog_1, IPPROTO_TCP)) {
        fprintf (stderr, "%s", "unable to register (MY_RPC_PROG, MY_RPC_VERS1, tcp).");
        exit(1);
    }
    if (!svc_register(transp, MY_RPC_PROG, MY_RPC_VERS2, my_rpc_prog_2, IPPROTO_TCP)) {
        fprintf (stderr, "%s", "unable to register (MY_RPC_PROG, MY_RPC_VERS2, tcp).");
        exit(1);
    }

    svc_run ();
    fprintf (stderr, "%s", "svc_run returned");
    exit (1);
    /* NOTREACHED */
}
View Code

my_xdr.c

/*
 * Please do not edit this file.
 * It was generated using rpcgen.
 */

#include "my.h"

bool_t
xdr_my_io_data_s (XDR *xdrs, my_io_data_s *objp)
{
    register int32_t *buf;

    int i;
     if (!xdr_int (xdrs, &objp->mtype))
         return FALSE;
     if (!xdr_int (xdrs, &objp->len))
         return FALSE;
     if (!xdr_vector (xdrs, (char *)objp->data, 1024,
        sizeof (char), (xdrproc_t) xdr_char))
         return FALSE;
    return TRUE;
}

bool_t
xdr_my_io_data_t (XDR *xdrs, my_io_data_t *objp)
{
    register int32_t *buf;

     if (!xdr_my_io_data_s (xdrs, objp))
         return FALSE;
    return TRUE;
}

更多詳情見請繼續閱讀下一頁的精彩內容: http://www.linuxidc.com/Linux/2017-02/140415p2.htm

Copyright © Linux教程網 All Rights Reserved