歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Golang 1.4 net/rpc client源碼解析

Golang 1.4 net/rpc client源碼解析

日期:2017/3/1 9:29:55   编辑:Linux編程

net/rpc是golang標准庫提供的rpc框架,下面我們重點看下net/rpc是如何實現的。 我本機源碼安裝路徑在/usr/local/go,這net/rpc(golang 1.4版本)涉及到的相關代碼主要有:

client.go

server.go

首先我們先從client.go,客戶端入手看:

type ClientCodecinterface{// WriteRequest must be safe for concurrent use by multiple goroutines.     WriteRequest(*Request,interface{}) error                                   
    ReadResponseHeader(*Response) error                                         
    ReadResponseBody(interface{}) error                                         

    Close() error                                                               
}   

type Callstruct{ServiceMethodstring// The name of the service and method to call.    Argsinterface{}// The argument to the function (*struct).           Replyinterface{}// The reply from the function (*struct).            Error         error       // After completion, the error status.               Done          chan *Call// Strobes when call is complete.                    Tracer*Trace// tracer                                            }  

type Clientstruct{                                                               
    codec ClientCodec                                                              

    reqMutex sync.Mutex// protects following                                      
    request  Request                                                               

    mutex    sync.Mutex// protects following                                      
    seq      uint64                                                                
    pending  map[uint64]*Call                                                      
    closing  bool// user has called Close                                         
    shutdown bool// server has told us to stop                                    } 

func (client *Client) send(call *Call){// client要想復用,保證線程安全,加上請求鎖reqMutex是必須的。                                
    client.reqMutex.Lock()                                                      
    defer client.reqMutex.Unlock()// 這其實是針對map的另外一把鎖,這樣可以更細粒度的操作                                                    
    client.mutex.Lock()// client如果外部調用關閉,那麼call也是結束狀態,之後我們再分析call.done()                                                         if client.shutdown || client.closing {                                      
        call.Error=ErrShutdown                                                
        client.mutex.Unlock()                                                   
        call.done()return}// 重點來了!seq序號自增在把call請求暫存在pennding的map中,鎖釋放                                                                  
    seq := client.seq                                                           
    client.seq++                                                                
    client.pending[seq]= call                                                  
    client.mutex.Unlock()// 這一塊代碼屬於編碼請求了,因為rpc涉及到調用具體是誰,所以需要把method傳給rpc server// 這裡的Seq是用於當server response的時候,seq從client->server,再從server->client,然後反查map,定位call對象使用的。                                         
    client.request.Seq= seq                                                    
    client.request.ServiceMethod= call.ServiceMethod// inject tracer,這個請忽視。。。                                                         
    client.request.Tracer= call.Tracer                                         
    err := client.codec.WriteRequest(&client.request, call.Args)if err !=nil{                                                             
        client.mutex.Lock()                                                     
        call = client.pending[seq]delete(client.pending, seq)                                             
        client.mutex.Unlock()if call !=nil{                                                        
            call.Error= err                                                    
            call.done()}}}

我們使用rpc的時候,都知道client是線程安全的,client其實是基於單個socket連接來,依賴channel來實現復用連接以及並行的。而臨時的調用對象Call都是保存在Client的map中的,至於每個call怎麼查找,也是根據seq序列號在請求server時候轉發過去,之後response的時候,client根據返回的seq再反查結果的。不難看出,實現了ClientCodec之後就可以自定義rpc協議請求頭和內容了。那麼send函數中的Call對象是從哪裡來的?

// 我們rpc請求的時候,調用就是這個方法,傳入方法名,參數,獲取返回等
func (client *Client)Call(serviceMethod string, args interface{}, reply interface{}) error {// Call裡面調用了client.Go,然後返回一個chan,之後阻塞等待,這是基本的同步調用
    call :=<-client.Go(serviceMethod, args, reply, make(chan *Call,1)).Donereturn call.Error} 

func (client *Client)Go(serviceMethod string, args interface{}, reply interface{},done chan *Call)*Call{// 構建call對象
    call :=new(Call)                                                           
    call.ServiceMethod= serviceMethod                                          
    call.Args= args                                                            
    call.Reply= reply   
    // 如果非外部傳入call,自己構建                                                       ifdone==nil{done= make(chan *Call,10)// buffered.                                }else{// If caller passes done != nil, it must arrange that                   // done has enough buffer for the number of simultaneous                // RPCs that will be using that channel.  If the channel                // is totally unbuffered, it's best not to run at all.                  if cap(done)==0{                                                     
            log.Panic("rpc: done channel is unbuffered")}}                                                                           
    call.Done=done// 發送請求                                                            
    client.send(call)return call                                                                 
}

在初始化client的時候,我們會指定ip,port等

// Dial connects to an RPC server at the specified network address.             
func Dial(network, address string)(*Client, error){                           
    conn, err := net.Dial(network, address)if err !=nil{returnnil, err                                                         
    }returnNewClient(conn),nil}// 我們看到其實NewClient內部使用的默認的gob編碼,gobClientCodes實現了Codec的接口                              
func NewClient(conn io.ReadWriteCloser)*Client{                               
    encBuf := bufio.NewWriter(conn)                                             
    client :=&gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}returnNewClientWithCodec(client)}// 當然也提供自定義的codec,你可以使用thrift協議、messagepack等來擴展                 // codec to encode requests and decode responses.                               
func NewClientWithCodec(codec ClientCodec)*Client{                            
    client :=&Client{                                                          
        codec:   codec,                                                         
        pending: make(map[uint64]*Call),}                                                                           
    go client.input()return client                                                               
}

type gobClientCodec struct{                                                    
    rwc    io.ReadWriteCloser                                                   
    dec    *gob.Decoder                                                         
    enc    *gob.Encoder                                                         
    encBuf *bufio.Writer}

最後,NewClient會後台開啟一枚goroutine,就是接受server返回然後轉發具體調用者了。

func (client *Client) input(){var err error                                                             
    var response Responsefor err ==nil{// 二話不說先獲取Response的頭                                                    
        response =Response{}                                                   
        err = client.codec.ReadResponseHeader(&response)if err !=nil{break}// 頭部中包含了序列號,用於定位pending map使用的                                                               
        seq := response.Seq// 小粒度鎖刪除map,獲取call對象                                                
        client.mutex.Lock()                                                     
        call := client.pending[seq]delete(client.pending, seq)                                             
        client.mutex.Unlock()switch{// 如果pending找不到,那麼肯定是異常了                                                              case call ==nil:// We've got no pending call. That usually means that               // WriteRequest partially failed, and call was already              // removed; response is a server telling us about an                // error reading request body. We should still attempt              // to read error body, but there's no one to give it to.            
            err = client.codec.ReadResponseBody(nil)if err !=nil{                                                     
                err = errors.New("reading error body: "+ err.Error())}// rpc 報錯了,解不開什麼的都有可能                                                          case response.Error!="":// We've got an error response. Give this to the request;           // any subsequent requests will get the ReadResponseBody            // error if there is one.                                           
            call.Error=ServerError(response.Error)                            
            err = client.codec.ReadResponseBody(nil)if err !=nil{                                                     
                err = errors.New("reading error body: "+ err.Error())}                                                                   
            call.done()default:// 默認還是正常的處理,獲取Body給Reply,讓調用者可見                                                               
            err = client.codec.ReadResponseBody(call.Reply)if err !=nil{                                                     
                call.Error= errors.New("reading body "+ err.Error())}                                                                   
            call.done()}}// 如果有啥不可逆的異常,那麼只能shutdown client了。全部退出吧                                                                         // Terminate pending calls.                                                 
    client.reqMutex.Lock()                                                      
    client.mutex.Lock()                                                         
    client.shutdown =true                                                      
    closing := client.closing                                                   
    if err == io.EOF {if closing {                                                            
            err =ErrShutdown}else{                                                                
            err = io.ErrUnexpectedEOF}}// 之前pending的也一個個結束吧,避免調用者都等待                                                                           for _, call := range client.pending {                                       
        call.Error= err                                                        
        call.done()}                                                                           
    client.mutex.Unlock()                                                       
    client.reqMutex.Unlock()if debugLog && err != io.EOF &&!closing {                                  
        log.Println("rpc: client protocol error:", err)}}

最後call.done做了什麼了,相比你也猜到:

// 把call對象傳遞給調用者,主要是獲取內部的Error
func (call *Call)done(){select{case call.Done<- call:// ok                                                                   default:// We don't want to block here.  It is the caller's responsibility to make// sure the channel has enough buffer space. See comment in Go().       if debugLog {                                                           
            log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")}}}

大致的分析就結束了,但是完整的rpc框架,還應該包括,服務發現,服務降級,服務追蹤,服務容錯等, 服務發現:可以使用zk,以及配合client定制的方式實現

服務降級:可以在zk中指定服務質量,以及根據回饋系統來drop request

服務追蹤:最近我在看Twitter的Zipkin和Google的Dapper,對核心rpc庫修改的方式避免大量植入代碼,但是golang要做到這點有點困難,一是AOP不好支持,所以現在只能考慮用侵入代碼,有更好思路的可以聯系我!

服務容錯:因為input本身單連接請求獲取server,有可能<-call一直不返回,導致業務大量hang,這個可以考慮加上一些channel的timeout特性來實現,只不過浪費了一些內存。

總體來說net/rpc還是一個不錯的框架,但是幾個地方需要考慮,一個是全局大鎖reqMutex,另外是call對象會大量創建(可否考慮call pool等)

Golang 1.4 net/rpc server源碼解析 http://www.linuxidc.com/Linux/2015-04/116467.htm

Ubuntu 14.04 上搭建 Golang 開發環境配置 http://www.linuxidc.com/Linux/2015-02/113977.htm

Linux系統入門學習-在Linux中安裝Go語言 http://www.linuxidc.com/Linux/2015-02/113159.htm

Ubuntu 安裝Go語言包 http://www.linuxidc.com/Linux/2013-05/85171.htm

《Go語言編程》高清完整版電子書 http://www.linuxidc.com/Linux/2013-05/84709.htm

Go語言並行之美 -- 超越 “Hello World” http://www.linuxidc.com/Linux/2013-05/83697.htm

我為什麼喜歡Go語言 http://www.linuxidc.com/Linux/2013-05/84060.htm

Go語言內存分配器的實現 http://www.linuxidc.com/Linux/2014-01/94766.htm

Copyright © Linux教程網 All Rights Reserved