go标准库rpc实践

2022-07-07 15:45:12 浏览数 (1)

前言

哈喽,大家好,我是asong,这是我的第六篇原创文章。这一篇分享一下go的标准库rpc,并进行实践。最近要做一个项目,需要用到rpc,现在先写一个小样例,熟悉一下。

什么是RPC

RPC(Remote Procedure Call)即远程过程调用,是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络细节的应用程序通信协议。RPC协议构建于TCP或UDP,或者是HTTP上。

在网络上找了一个RPCclient-server模型。就不自己画了,比较丑!!!

这个流程就是客户端以本地调用方式(即以接口的方式)调用服务;客户端存根(client stub)接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体(将消息体对象序列化为二进制);客户端通过sockets将消息发送到服务端;服务端存根(server stub)收到消息后进行解码(将消息对象反序列化);服务端存根根据解码结果调用本地服务;本地服务执行并将返回结果返回给服务端存根;服务端存根将返回结果打包成消息(将结果消息对象序列化);服务端存根听过sockets将消息发送到客户端;客户端存根接收到消息,并进行解码(将结果消息发序列化);客户端(client)得到最终结果。

了解了RPC的过程,我们就是用Go的标准库net/rpc包实现一个官方样例,来了解RPC调用过程。这里使用HTTP进行传输写的样例。

client.go

代码语言:javascript复制
package main

import (
   "fmt"
   "log"
   "net/rpc"
   "time"
)

/**
* 功能:rpc client code
* author: 孙松
* email: 741896420@qq.com
 */

type Args struct {
   A,B int
}

type Quotient struct {
   Quo,Rem int
}

func main()  {
   //调用rpc服务端提供的方法之前,先与rpc服务端建立连接
   client, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
   if err != nil {
      log.Fatal("dialHttp error", err)
      return
   }
   //同步调用服务端提供的方法

   args := &Args{7, 8}
   var reply int
   //可以查看源码 其实Call同步调用是用异步调用实现的。后续再详细学习
   err = client.Call("Arith.Multiply", args, &reply) //这里会阻塞三秒

   if err != nil {
      log.Fatal("call Arith.Multiply error", err)
   }
   fmt.Printf("Multiply:%d*%d=%dn", args.A, args.B, reply)

   var quo Quotient

   //异步调用
   divCall := client.Go("Arith.Divide", args, &quo, nil)

   //使用select模型监听通道有数据时执行,否则执行后续程序
   for {
      select {
      case <-divCall.Done:
         fmt.Printf("%d divide %d是%d %d, 退出执行!", args.A, args.B, quo.Quo,quo.Rem)
         return
      default:
         fmt.Println("继续等待....")
         time.Sleep(time.Second * 1)
      }
   }
}

server.go

代码语言:javascript复制
package main

import (
   "errors"
   "fmt"
   "log"
   "net"
   "net/http"
   "net/rpc"
   "os"
)

/**
* 功能:rpc server code
* author: 孙松
* email: 741896420@qq.com
 */

type Args struct {
   A,B int
}

type Quotient struct {
   Que,Rem int
}
type Arith int

//乘积
func (t *Arith)Multiply(a *Args,reply *int)  error{
   *reply = a.A * a.B
   return nil
}

func (t *Arith)Divide(a *Args,quo *Quotient) error {
   if a.B == 0{
      return errors.New("divide by zero")
   }
   quo.Que = a.A / a.B
   quo.Rem = a.A % a.B
   return nil
}
func main()  {
   arith := new(Arith)
   rpc.Register(arith)
   rpc.HandleHTTP()
   l,err := net.Listen("tcp","127.0.0.1:1234")
   if err!= nil{
      log.Fatal("listen error:",err)
   }
   go http.Serve(l,nil)
   fmt.Println("server 启动成功")
   os.Stdin.Read(make([]byte,1))
}

运行结果与分析

Multiply:7*8=56

继续等待....

7 divide 8是0 7, 退出执行!

Process finished with exit code 0

从代码分析,server端rpc服务注册了一个Arith对象,公开了方法为客户端提供调用,采用http协议作为rpc调用的载体,处理请求。client端调用rpc服务端提供的方法之前,先于rpc服务端建立连接,使用Call方法调用远程方法。

在客户端代码中,我们分别使用client.call 和 client.call。client.call是同步调用,client.go是异步调用。为什么会这样呢。这里我查看了源码。通过源码可以看到,client.call底层就是调用client.go。

代码语言:javascript复制
// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
   call := new(Call)
   call.ServiceMethod = serviceMethod
   call.Args = args
   call.Reply = reply
   if done == nil {
      done = make(chan *Call, 10) // buffered.
   } else {
      // If caller passes done != nil, it must arrange that
      // done has enough buffer for the number of simultaneous
      // RPCs that will be using that channel. If the channel
      // is totally unbuffered, it's best not to run at all.
      if cap(done) == 0 {
         log.Panic("rpc: done channel is unbuffered")
      }
   }
   call.Done = done
   client.send(call)
   return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
   call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
   return call.Error
}

client.call内部使用一个chanl等待client.go,所以client.call是同步调用。这两个的使用根据自己的使用场景进行选择,注意下就可以了。

总结

好啦,这一篇就结束了。。。

0 人点赞