在上一节我们使用gRPC实现了客户端和服务端的一对一通讯,也就是客户端向服务端发出一个请求,服务端返回一个结果。但是在很多场景下可能需要客户端向服务端连续发送多个请求后,服务端才能进行处理然后返回一个结果,例如客户端向服务端发送多个订单号,让服务端对订单号进行记录,然后服务端把所有订单号记录后返回结果;或者是客户端发送一个订单号查询所有大于给定订单号的交易记录,然后服务端返回满足条件的十几条记录等。
我们首先看看服务端给客户端返回多条记录的情形。在gRPC中,可以连续发送多条数据的对象叫stream,该对象支持异步发送,假设客户端要查询所有订单号大于10的交易记录,假设在服务端存储了满足条件的记录有20条,那么服务端可以先返回5条,等5分钟后再返回10条,然后等20分钟后再返回5条,因此客户端在接收记录时需要做相应的异步处理。
我们首先修改proto文件如下:
代码语言:javascript复制ervice OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns(Order);
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
}
上面代码中的stream表明,当客户端通过searchOrders接口向服务器发出请求时,它需要通过stream对象来获取一系列从服务器返回的Order数据。按照上一节的方法再次编译proto文件后,我们看看它内容的改变,使用searchOrders作为关键字在生成的pb.go文件中查询我们可以看到如下内容:
代码语言:javascript复制type OrderManagementClient interface {
GetOrder(ctx context.Context, in *wrappers.StringValue, opts ...grpc.CallOption) (*Order, error)
SearchOrders(ctx context.Context, in *wrappers.StringValue, opts ...grpc.CallOption) (OrderManagement_SearchOrdersClient, error)
}
。。。
type OrderManagement_SearchOrdersClient interface {
Recv() (*Order, error)
grpc.ClientStream
}
type orderManagementSearchOrdersClient struct {
grpc.ClientStream
}
func (x *orderManagementSearchOrdersClient) Recv() (*Order, error) {
m := new(Order)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
这段代码表明客户端在调用searchOrders接口时它会获得一个名为OrderManagement_SearchOrdersClient的对象,这个对象实现了一个接口叫Recv,我们不难猜测到时候客户端需要调用Recv()来接收服务端返回的一系列Order数据。继续往下查可以看到如下代码:
代码语言:javascript复制// OrderManagementServer is the server API for OrderManagement service.
type OrderManagementServer interface {
GetOrder(context.Context, *wrappers.StringValue) (*Order, error)
SearchOrders(*wrappers.StringValue, OrderManagement_SearchOrdersServer) error
}
。。。。
type OrderManagement_SearchOrdersServer interface {
Send(*Order) error
grpc.ServerStream
}
type orderManagementSearchOrdersServer struct {
grpc.ServerStream
}
func (x *orderManagementSearchOrdersServer) Send(m *Order) error {
return x.ServerStream.SendMsg(m)
}
上面代码代码表明,服务端在实现searchOrders接口时需要使用一个名为OrderManagement_SearchOrdersServer的对象,它用于一个接口叫Send,我们不难猜测服务端将调用这个接口给客户端发送一系列Order数据,我们首先看服务端代码的实现,在server/main.go中增加代码如下:
代码语言:javascript复制func (s *server) SearchOrders(searchQuery *wrappers.StringValue,
stream pb.OrderManagement_SearchOrdersServer) error {
for key, order := range orderMap {
log.Print(key, order)
for _, itemStr := range order.Items {
log.Print(itemStr)
if strings.Contains(itemStr, searchQuery.Value) {
err := stream.Send(&order)
if err != nil {
return fmt.Errorf("error sending message to stream: %v", err)
}
log.Print("Matching Order Found: " key)
break
}
}
}
return nil //返回nil,gRPC会关闭服务器发往客户端的数据管道
}
服务端通过实现SearchOrders接口来执行业务逻辑,其中stream的类型为OrderManagement_SearchOrdersServer,它有gRPC框架传给我们,通过前面的分析我们知道它有接口Send, 函数的输入参数searchQuery其实就是客户端发送过来的订单号字符串,代码从该数据结构拿到订单号后,从数据存储中进行查询,把所有查到的满足条件的Order数据通过Send发送给客户端。这里需要注意的是,客户端在接收数据过程中可能由于多种原因中断连接,这时服务端调用Send就会返回错误,同时还需要注意的是当服务端发送完所有数据后,一定要return nil,这样gRPC才会把发送管道给关闭调。
同理我们看客户端的实现,在client/main.go的main函数中添加如下代码:
代码语言:javascript复制
searchStream, _ := client.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})
//如果server 使用stream传输结果,客户端需要使用Recv()接收多个返回
for {
searchOrder, err := searchStream.Recv()
if err == io.EOF {
log.Print("EOF")
break
}
if err == nil {
log.Print("Search result: ", searchOrder)
}
}
从前面代码查询可以看到,客户端调用SearchOrder时会返回一个orderManagementSearchOrdersClient对象,它实现了接口Recv()用来接收服务端发送来的一连串数据,所以在上面代码实现中,我们在for循环中调用Recv()接口不断接收服务端发送的数据,如果数据发送完了,前面服务端通过return nil断掉连接后,客户端就会在调用Recv时得到io.EOF错误,这是就可以中断对Recv()的调用。
以上是客户端发送一个请求,服务端返回一系列结果,我们看看反过来,客户端发送一系列请求,服务端返回一个结果,首先还是修改proto文件,增加一个接口定义:
代码语言:javascript复制service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns(Order);
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
}
updateOrders就是新增加的接口,注意到它对应的输入参数使用了stream来修饰,也就是说客户端会给服务端连续发送一系列Order数据,服务端处理后只返回一个StringValue结构,我们可以使用前面的搜索方法在新编译后的pb.go文件里查询新增加的接口,同样道理,服务端在实现该接口是,也是在一个for循环中使用Recv接口来获取客户端发送的一系列数据,在server/main.go中添加代码如下:
代码语言:javascript复制func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
ordersStr := "Updated Order IDs: "
for {
order, err := stream.Recv()
if err == io.EOF {
//通知客户端不用继续发送
return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed" ordersStr})
}
orderMap[order.Id] = *order
log.Printf("Order ID ", order.Id, ": Updated")
ordersStr = order.Id ", "
}
}
代码的实现逻辑跟前面客户端实现的服务请求逻辑一样,相当于服务端和客户端的角色颠倒了一下。这里需要注意的是服务端如何给客户端返回结果,代码中调用了SendAndClose,它把返回结果传输给客户端的同时将连接关闭,于是客户端就不能继续再给服务端发送数据。我们看看客户端的实现,在client/main.go中添加代码如下:
代码语言:javascript复制updOrder1 := pb.Order{Id: "102", Items:[]string{"Google Pixel 3A", "Google Pixel Book"}, Destination:"Mountain View, CA", Price:1100.00}
updOrder2 := pb.Order{Id: "103", Items:[]string{"Apple Watch S4", "Mac Book Pro", "iPad Pro"}, Destination:"San Jose, CA", Price:2800.00}
updOrder3 := pb.Order{Id: "104", Items:[]string{"Google Home Mini", "Google Nest Hub", "iPad Mini"}, Destination:"Mountain View, CA", Price:2200.00}
updateStream, err := client.UpdateOrders(ctx)
if err != nil {
log.Fatalf("%v.UpdateOrders(_) = , %v", client, err)
}
if err := updateStream.Send(&updOrder1); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)
}
if err := updateStream.Send(&updOrder2); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)
}
if err := updateStream.Send(&updOrder3); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder3, err)
}
updateRes, err := updateStream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
}
log.Printf("Update orders res: %s", updateRes)
客户端先是构造一系列Order数据然后分别调用多次Send传递给服务端,如果客户端没有多余数据要传输后,它调用CloseAndRecv(),这个函数会让服务端的Recv()返回io.EOF错误,然后客户端阻塞等待服务端将处理结果返回。
最后我们看客户端给服务端发送一系列数据,然后服务端返回一系列结果给客户端的情况。假设客户端给服务端发送了一系列订单信息,服务端收到订单信息后,把收货地址相同的货物信息合在一起发送给客户端,我们用shipment表示收货地址相同的货物信息组合。如果客户端发送order1, order2,order3, order4 等4个订单号给服务端,其中order1 ,order3 对应货物的收货地址一样, order2, order4对应的收货地址一样,于是服务端就返回两个shipment结构,第一个对应order1, order3, 第二个对应order2, order4,我们先看proto文件的修改:
代码语言:javascript复制service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns(Order);
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
rpc processOrders(stream google.protobuf.StringValue) returns (stream CombinedShipment);
}
message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
message CombinedShipment {
string id = 1;
string status = 2;
repeated Order orderList = 3;
}
我们先看服务端的实现,在server/main.go中添加如下代码:
代码语言:javascript复制func (s *server) ProcessOrder(stream pb.OrderManagement_ProcessOrdersServer) error {
batchMarker := 1
var combinedShipmentMap = make(map[string]pb.CombinedShipment)
for {
orderId, err := stream.Recv()
log.Printf("Reading Proc order: %s", orderId)
if err == io.EOF {
log.Printf("EOF: %s", orderId)
for _, shipment := range combinedShipmentMap {
if err := stream.Send(&shipment); err != nil {
return err
}
}
return nil //返回nil,gRPC框架会关闭调server发送给客户端的管道
}
if err != nil {
log.Println(err)
return err
}
destination := orderMap[orderId.GetValue()].Destination
shipment, found := combinedShipmentMap[destination]
if found {
ord := orderMap[orderId.GetValue()]
shipment.OrdersList = append(shipment.OrderList, &ord)
combinedShipmentMap[destination] = shipment
} else {
comShip := pb.CombinedShipment{Id: "cmb - " (orderMap[orderId.GetValue()].Destination), Status: "Processed!",}
ord := orderMap[orderId.GetValue()]
comShip.OrdersList = append(shipment.OrdersList, &ord)
combinedShipmentMap[destination] = comShip
log.Print(len(comShip.OrdersList), comShip.GetId())
}
if batchMarker == orderBatchSize {
for _, comb := range combinedShipmentMap {
log.Printf("Shipping: %v -> %v", comb.Id, len(comb.OrdersList))
if err := stream.Send(&comb); err != nil {
return err
}
}
batchMarker = 0
combinedShipmentMap = make(map[string]pb.CombinedShipment)
} else {
batchMarker
}
}
}
上面代码实现我们只需要注意几点,首先它使用一个stream对象来完成两个功能,一个功能是调用Recv()来接收客户端发送的多个数据,然后同样是这个对象,继续调用它的Send接口给客户端发送多个数据,也就是一个stream对象既负责接收客户端发送的一系列数据,又负责将服务端的一系列处理结果发送给客户端,把握这一点就行,其他那些业务逻辑无关紧要。
我们再看看客户端的实现,在client/main.go中添加如下代码:
代码语言:javascript复制func main() {
。。。
channel := make(chan struct{})
go asncClientBidirectionalRPC(streamProcOrder, channel)
time.Sleep(time.Milliscond * 1000)
if err := streamProcOrder.Send(&wrapper.StringValue{Value: "101"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "101", err)
}
if err := streamProcOrder.CloseSend(); err != nil {
log.Fatal(err)
}
channel <- struct{}{}
}
func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {
for {
combinedShipment, errorProcOrder := streamProcOrder.Recv()
if errProcOrder == io.EOF {
break
}
log.Printf("Combined shipment: ", combinedShipment.OrdersList)
}
<-c
}
上面代码实现中有一个关键点需要把握,客户端也是通过一个stream对象来完成数据的发送和接收,同时我们要特别注意到,同一个stream对象发送和接收完全可以在异步的条件下同时进行,所有上面代码在主函数main里通过Send发送请求,然后扔出一个goroutine异步接收服务端发送回来的数据,虽然发送和接收同时进行但客户端不用加锁,也就是gRPC框架保证了发送和接收在异步情况下业务逻辑依然不会出错。
相关代码从上一节的github路径可以获取。