4.实战gRPC四种通信模式

2023-10-30 16:24:20 浏览数 (1)

gRPC 提供了四种主要的通信模式:单一请求-单一响应(Unary)、客户端流式、服务器流式和双向流式。每种模式都有不同的特点和适用场景。下面是对这四种通信模式的详细介绍以及它们的使用场景

单一请求-单一响应

定义

在单一请求-单一响应模式中,客户端发送一个请求给服务器,然后等待服务器返回一个响应。这是最常见、最简单的通信模式

使用场景

  • 当需要获取某个资源的详细信息时,例如获取用户的个人资料。
  • 当需要执行简单的计算并获得结果时,例如进行数字运算。

代码实现

定义proto
代码语言:javascript复制
syntax = "proto3";

package com.lglbc.hello;

service OpenAPI {
  rpc SingleRequest (Request) returns (Response);
}

message Request {
  string name = 1;
}

message Response {
  string message = 1;
}

使用前面同样的方式生成java文件

实现服务端接口
代码语言:javascript复制
public class Service extends OpenAPIGrpc.OpenAPIImplBase {
    @Override
    public void singleRequest(Mode.Request request, StreamObserver<Mode.Response> responseObserver) {
        Mode.Response response = Mode.Response.newBuilder()
                .setMessage("你好 " request.getName()  ",单一请求")
                .build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }
}
实现客户端调用
代码语言:javascript复制
public class Client {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090).usePlaintext().build();
        OpenAPIGrpc.OpenAPIBlockingStub stub = OpenAPIGrpc.newBlockingStub(channel);
        Mode.Request request = Mode.Request.newBuilder().setName("乐哥聊编程").build();
        Mode.Response response = stub.singleRequest(request);
        System.out.println("调用结果: "   response.getMessage());
        channel.shutdown();
    }
}
创建服务

这段代码在后面几种模式中是通用的,就不再写了

代码语言:javascript复制
public class Server {
    public static void main(String[] args) throws Exception {
        io.grpc.Server server = ServerBuilder.forPort(9090)
                .addService(new Service())
                .build();
        server.start();
        System.out.println("gRPC服务启动");
        server.awaitTermination();
    }
}

启动之后进行测试

客户端流式

定义

在客户端流式模式中,客户端通过流式方式发送多个请求给服务器,然后等待服务器返回一个响应

使用场景

  • 当客户端需要向服务器发送一系列请求,并等待服务器返回结果时,例如逐步上传文件,每次发送一个数据块。
  • 当客户端需要发送大量的数据给服务器,但服务器只返回一个结果,例如批量处理数据。

代码实现

定义proto
代码语言:javascript复制
syntax = "proto3";

package com.lglbc.hello;

service OpenAPI {
  rpc ClientStreaming (stream Request) returns (Response);
}

message Request {
  string name = 1;
}

message Response {
  string message = 1;
}

实现服务端接口
代码语言:javascript复制
public class Service extends OpenAPIGrpc.OpenAPIImplBase {

    @Override
    public StreamObserver<Mode.Request> clientStreaming(StreamObserver<Mode.Response> responseObserver) {
        return new StreamObserver<Mode.Request>() {
            @Override
            public void onNext(Mode.Request request) {
                // 处理接收到的请求
                System.out.println("收到客户端请求:" request.getName());
            }

            @Override
            public void onError(Throwable t) {
                // 处理错误
                System.out.println("错误信息:" t.getMessage());
            }

            @Override
            public void onCompleted() {
                // 发送响应
                Mode.Response response = Mode.Response.newBuilder()
                        .setMessage("消息处理完成")
                        .build();
                responseObserver.onNext(response);
                responseObserver.onCompleted();
            }
        };
    }
}
客户端实现
代码语言:javascript复制
public class Client {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
                .usePlaintext()
                .build();

        OpenAPIGrpc.OpenAPIStub stub = OpenAPIGrpc.newStub(channel);

        StreamObserver<Mode.Response> responseObserver = new StreamObserver<Mode.Response>() {
            @Override
            public void onNext(Mode.Response response) {
                System.out.println("Received response: "   response.getMessage());
            }

            @Override
            public void onError(Throwable t) {
                System.err.println("Error: "   t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("Server completed.");
            }
        };

        StreamObserver<Mode.Request> requestObserver = stub.clientStreaming(responseObserver);

        for (int i = 0; i < 5; i  ) {
            Mode.Request request = Mode.Request.newBuilder()
                    .setName("Request "   (i   1))
                    .build();
            requestObserver.onNext(request);
        }

        // 结束请求流
        requestObserver.onCompleted();

        // 等待完成
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        channel.shutdown();
    }

}

服务端流式

定义

在服务器流式模式中,服务器通过流式方式发送多个响应消息给客户端,客户端等待服务器发送完所有响应后结束

使用场景

  • 当服务器需要向客户端发送大量数据或实时数据时,例如服务器实时向客户端推送消息,流式传输文件。
  • 当服务器需要返回多个相关的结果给客户端,例如搜索结果列表。

代码实现

定义proto
代码语言:javascript复制
syntax = "proto3";

package com.lglbc.hello;

service OpenAPI {
  rpc ServerStreaming (Request) returns (stream Response);
}

message Request {
  string name = 1;
}

message Response {
  string message = 1;
}

实现服务端接口
代码语言:javascript复制
public class Service extends OpenAPIGrpc.OpenAPIImplBase {
    @Override
    public void serverStreaming(Mode.Request request, StreamObserver<Mode.Response> responseObserver) {
        for (int i = 0; i < 10; i  ) {
            // 发送多个响应消息
            Mode.Response response = Mode.Response.newBuilder()
                    .setMessage("服务端消息:" i)
                    .build();
            responseObserver.onNext(response);
        }
        responseObserver.onCompleted();
    }
}
实现客户端
代码语言:javascript复制
public class Client {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
                .usePlaintext()
                .build();

        OpenAPIGrpc.OpenAPIBlockingStub stub = OpenAPIGrpc.newBlockingStub(channel);

        Mode.Request request = Mode.Request.newBuilder()
                .setName("乐哥聊编程")
                .build();

        stub.serverStreaming(request)
                .forEachRemaining(response -> {
                    System.out.println("Received response: "   response.getMessage());
                });

        channel.shutdown();
    }

}

双向流式

定义

在双向流式模式中,客户端和服务器都可以通过流式方式同时发送和接收多个消息

使用场景

  • 当客户端和服务器之间需要实时双向通信时,例如聊天应用、实时协作工具。
  • 当客户端和服务器需要交换大量数据,而不仅限于一次请求-响应交互。

代码实现

定义proto
代码语言:javascript复制
syntax = "proto3";

package com.lglbc.hello;

service OpenAPI {
  rpc BidirectionalStreaming (stream Request) returns (stream Response);
}

message Request {
  string name = 1;
}

message Response {
  string message = 1;
}
实现服务端接口
代码语言:javascript复制
public class Service extends OpenAPIGrpc.OpenAPIImplBase {
    @Override
    public StreamObserver<Mode.Request> bidirectionalStreaming(StreamObserver<Mode.Response> responseObserver) {
        return new StreamObserver<Mode.Request>() {
            @Override
            public void onNext(Mode.Request request) {
                // 处理接收到的请求
                System.out.println("收到客户端请求:" request.getName());
                // 发送多个响应消息
                Mode.Response response = Mode.Response.newBuilder()
                        .setMessage("服务端消息:")
                        .build();
                responseObserver.onNext(response);
            }

            @Override
            public void onError(Throwable t) {
                // 处理错误
                System.out.println("错误信息:" t.getMessage());
            }

            @Override
            public void onCompleted() {
                // 发送响应
                Mode.Response response = Mode.Response.newBuilder()
                        .setMessage("消息处理完成")
                        .build();
                responseObserver.onNext(response);
                responseObserver.onCompleted();
            }
        };
    }
}

实现客户端

代码语言:javascript复制
public class Client {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
                .usePlaintext()
                .build();

        OpenAPIGrpc.OpenAPIStub stub = OpenAPIGrpc.newStub(channel);

        Mode.Request request = Mode.Request.newBuilder()
                .setName("乐哥聊编程")
                .build();
        StreamObserver<Mode.Response> responseObserver = new StreamObserver<Mode.Response>() {
            @Override
            public void onNext(Mode.Response response) {
                System.out.println("Received response: "   response.getMessage());
            }

            @Override
            public void onError(Throwable t) {
                System.err.println("Error: "   t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("Server completed.");
            }
        };
        StreamObserver<Mode.Request> requestObserver = stub.bidirectionalStreaming(responseObserver);


        for (int i = 0; i < 5; i  ) {
            Mode.Request request2 = Mode.Request.newBuilder()
                    .setName("Request "   (i   1))
                    .build();
            requestObserver.onNext(request2);
        }

        // 结束请求流
        requestObserver.onCompleted();

        // 等待完成
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        channel.shutdown();
    }

}

0 人点赞