源码地址: github.com/golang/go/t…
1. 基本使用
先来看看调用的官方例子:
- 服务器部分代码:
// content of server.go
package main
import(
"net"
"net/rpc"
"net/http"
"errors"
"log"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func listenTCP(addr string) (net.Listener, string) {
l, e := net.Listen("tcp", addr)
if e != nil {
log.Fatalf("net.Listen tcp :0: %v", e)
}
return l, l.Addr().String()
}
func main() {
rpc.Register(new(Arith)) //注册服务
var l net.Listener
tcpAddr := "127.0.0.1:8080"
l, serverAddr := listenTCP(tcpAddr) //监听TCP连接
log.Println("RPC server listening on", serverAddr)
go rpc.Accept(l)
rpc.HandleHTTP() //监听HTTP连接
httpAddr := "127.0.0.1:8081"
l, serverAddr = listenTCP(httpAddr)
log.Println("RPC server listening on", serverAddr)
go http.Serve(l, nil)
select{}
}
复制代码
rpc调用的功能就是Arith实现了一个Multiply和Divide方法。看main函数,rpc实现了一个注册rpc.Register(new(Arith))
方法,然后启动监听listenTCP(tcpAddr)
,这个是通过net包中的Listen方法,监听的对象可以是TCP连接rpc.Accept(l)
,也可以试HTTP连接http.Serve(l, nil)
,这个是借助net/http包启动HTTPServer.
- 客户端部分代码
// content of client.go
package main
import(
"net/rpc"
"log"
"fmt"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
func main() {
client, err := rpc.DialHTTP("tcp", "127.0.0.1:8081")
if err != nil {
log.Fatal("dialing:", err)
}
// Synchronous call
args := &Args{7,8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%dn", args.A, args.B, reply)
// Asynchronous call
clientTCP, err := rpc.Dial("tcp", "127.0.0.1:8080")
if err != nil {
log.Fatal("dialing:", err)
}
quotient := new(Quotient)
divCall := clientTCP.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
if replyCall.Error != nil {
fmt.Println(replyCall.Error)
} else {
fmt.Printf("Arith: %d/%d=%d...%dn", args.A, args.B, quotient.Quo, quotient.Rem)
}
复制代码
客户端代码rpc 提供了两个方法 rpc.DialHTTP
和 rpc.Dial
分别提供监听 HTTP 和 Tcp 连接。然后通过 Call
或者 Go
来调用服务器的方法,二者的区别是一个是同步调用,Go
是异步调用。
运行结果:
代码语言:javascript复制// server.go
➜ server ./serve
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8080
2019/06/23 15:56:15 Test RPC server listening on 127.0.0.1:8081
复制代码
代码语言:javascript复制// client.go
➜ client ./client
Arith: 7*8=56
Arith: 7/8=0...7
复制代码
2.client.go 源码分析
先来看看客户端的源码,先上一张图了解一下客户端代码的主要逻辑:
Dial
andDialHTTP
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}
复制代码
Dial
建立在 net.Dial 上,返回一个client对象,DialHTTP
跟Dial
类似,只不过多了一些HTTP的处理,最终都是返回 NewClient(conn)。
NewClient
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}
// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
复制代码
NewClient
里做了2件事,第一件事是生成client结构体对象,包括序列化方式,初始化其中对象等等,Go Rpc默认采用的是gob序列化,但也可以用json或者protobuf。第二件事是启动一个goroutine协程,调用了 input
方法,这个client的核心部分,下面再讲。
Call
andGo
上面例子中,生成client对象后,会显式的调用Call
或Go
,表示同步调用和异步调用。下面来看看源码:
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
复制代码
可以看到,client.Call
方法其实也是调用client.Go
,只不过通过chan
进行阻塞。
生成一个Call的结构体,将服务器的调用方法、参数、返回参数,调用结束标记进行组装,然后调用client.send
的方法,将call结构体发给服务器。服务器拿到这些参数后,会通过反射出具体的方法,然后执行对应的函数。下面是Call
结构体的定义:
// Call
type Call struct {
ServiceMethod string // The name of the service and method to call. 服务方法名
Args interface{} // The argument to the function (*struct). 请求参数
Reply interface{} // The reply from the function (*struct). 返回参数
Error error // After completion, the error status. 错误状态
Done chan *Call // Strobes when call is complete.
}
复制代码
client.send
func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := client.seq
client.seq
client.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
复制代码
send 方法是将刚才的call
结构体中的信息发给服务器,首先数据不是直接发给服务器的,而是将请求参数和服务器的方法先赋值给client结构体中的Request结构体,同时在赋值的过程需要加锁。然后再调用Gob的WriteRequest方法,将数据刷到缓存区。
client.inputclient.send
方法是将数据发给Server,而input则相反,获取Server的返回结果Response给客户端。
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
err = client.codec.ReadResponseBody(nil)
....
}
call.done()
}
}
...
}
}
复制代码
主要逻辑是不断循环读取TCP上的流,把Header解析成Response对象,以及将Body解析到call.Reply对象,解析完后触发call中的done函数。这样客户端就可以拿到Reply对象就是服务器返回的结果,可以打印获取其中的值。
总结:
描述完这几个方法,在回头看开始的client.go
的流程图就清晰了,可以说是分两条线,一条线显示的调用发送请求数据,另外一条线则起协程获取服务器的返回数据。
3. server.go 源码分析
话不多说,先来一张图了解一下大概:
整体分三部分,第一部分注册服务器定义的方法,第二部分监听客户端的请求,解析获取到客户端的请求参数。第三部分拿到请求参数执行服务器的调用函数,将返回结果返回给客户端。
整个过程其实可以对比是一次socket的调用过程。
register
首先来看一下server的结构体:
type methodType struct {
sync.Mutex // protects counters
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}
type Server struct {
serviceMap sync.Map // map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}
复制代码
看英语注释就比起清楚具体是做什么的,Server存储服务器的service以及其请求的Request和Response,这二个就是跟客户端约定的协议,如下:
代码语言:javascript复制type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
}
type Response struct {
ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request
Error string // error, if any.
next *Response // for free list in Server
}
复制代码
service 存储服务器需要注册的方法,methodType就是具体方法的属性。
所以要想客户端进行远程调用服务器的方法,前提是在调用之前,服务器的方法均已加载在Server结构体中,所以需要服务器显示的调用register方法,下面看一下里面核心的代码:
代码语言:javascript复制func (server *Server) register(rcvr interface{}, name string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
...
s.name = sname
// Install the methods
s.method = suitableMethods(s.typ, true)
...
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
...
}
...
}
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
argType := mtype.In(1)
...
replyType := mtype.In(2)
...
methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
}
return methods
}
复制代码
这段代码就是通过反射把结构体实现的方法的一些属性获取到,包括本身可执行的方法对象、名称、请求参数、返回参数。
最终存储到server的serviceMap中。客户端调用服务器的方法的结构为 struct.method,这样只需要按 . 进行分割,拿到struct名称和method名称则可以通过再serviceMap获取到方法,执行获得结果。
注册完方法后,接下来就是监听客户端的请求了。
Accept
先来看看 Accept
的代码:
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Print("rpc.Serve: accept:", err.Error())
return
}
go server.ServeConn(conn)
复制代码
通过 net 包中的监听tcp端口,然后起了一个协程,来看看这个协程里做了什么?
代码语言:javascript复制func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
...
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
...
}
复制代码
这段也好理解,ServeConn 将gob序列化的方法和conn保存到gobServerCodec结构体,然后调用了server.ServeCodec方法,这个方式做的事情就是将客户端传过来的包解析序列化解析,将请求参数,待返回的变量,以及是调服务器哪个方法,这些均在上面的 server.readRequest方法处理。
代码语言:javascript复制func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, keepReading bool, err error) {
service, mtype, req, keepReading, err = server.readRequestHeader(codec)
...
}
func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
// Grab the request header.
req = server.getRequest()
...
dot := strings.LastIndex(req.ServiceMethod, ".")
...
serviceName := req.ServiceMethod[:dot]
methodName := req.ServiceMethod[dot 1:]
// Look up the request.
svci, ok := server.serviceMap.Load(serviceName)
...
svc = svci.(*service)
mtype = svc.method[methodName]
...
}
return
}
复制代码
核心的功能再 readRequestHeader 中,做的一件事就是将客户端传过来的 struct.method,按 . 进行分割,然后拿到serviceName和methodName,然后再去server.serviceMap中拿到具体的服务和方法执行对象。
拿到之后,会起一个协程,调service.call方法,这里面做的事情就是执行服务器服务的方法,拿到返回结果,再调用WriteReponse,将数据写回去。然后客户端的 input 方法循环获取结果。这样形成闭环。
下面看看service.call方法:
代码语言:javascript复制func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
...
function := mtype.method.Func
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
...
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
...
}
复制代码
实现的功能跟上面分析的一样,通过mtype拿到函数对象,然后调用反射的Call方法执行得到结果,最后调用server.sendResponse发送发回结果。
看到这里再回过来看上面画的Server代码流程图,就非常清晰了。
Go Rpc源码解读就到这里。
4. 总结
Go RPC源码目前官方已经没有维护,官方推荐使用grpc,下一篇计划分析grpc的源码。
下面总结一下优缺点:
- 优点:
- 代码精简,可扩展性高。
- 缺点:
- 同步调用时,通过chan阻塞异步的Go方法,并没有处理超时,这样如果超时将导致大量的协程无法释放。
- 可能存在内存泄漏的情况,因为客户端的请求数据在Server结构体中,如果Server端不返回则不会清理其中的数据,客户端的Go函数退出并不会清理其中的内容,所以Server结构体会一直存储,从而内存泄漏。
目前开源的RPC框架已经不是像这种简单的网络调用了,还会包括很多服务治理的功能,比如服务注册与发现、限流熔断、监控等等。