gRPC 提供了四种主要的通信模式:单一请求-单一响应(Unary)、客户端流式、服务器流式和双向流式。每种模式都有不同的特点和适用场景。下面是对这四种通信模式的详细介绍以及它们的使用场景
单一请求-单一响应
定义
在单一请求-单一响应模式中,客户端发送一个请求给服务器,然后等待服务器返回一个响应。这是最常见、最简单的通信模式
使用场景
- 当需要获取某个资源的详细信息时,例如获取用户的个人资料。
- 当需要执行简单的计算并获得结果时,例如进行数字运算。
代码实现
定义proto
代码语言:javascript复制syntax = "proto3";
package com.lglbc.hello;
service OpenAPI {
rpc SingleRequest (Request) returns (Response);
}
message Request {
string name = 1;
}
message Response {
string message = 1;
}
使用前面同样的方式生成java文件
实现服务端接口
代码语言:javascript复制public class Service extends OpenAPIGrpc.OpenAPIImplBase {
@Override
public void singleRequest(Mode.Request request, StreamObserver<Mode.Response> responseObserver) {
Mode.Response response = Mode.Response.newBuilder()
.setMessage("你好 " request.getName() ",单一请求")
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
实现客户端调用
代码语言:javascript复制public class Client {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090).usePlaintext().build();
OpenAPIGrpc.OpenAPIBlockingStub stub = OpenAPIGrpc.newBlockingStub(channel);
Mode.Request request = Mode.Request.newBuilder().setName("乐哥聊编程").build();
Mode.Response response = stub.singleRequest(request);
System.out.println("调用结果: " response.getMessage());
channel.shutdown();
}
}
创建服务
这段代码在后面几种模式中是通用的,就不再写了
代码语言:javascript复制public class Server {
public static void main(String[] args) throws Exception {
io.grpc.Server server = ServerBuilder.forPort(9090)
.addService(new Service())
.build();
server.start();
System.out.println("gRPC服务启动");
server.awaitTermination();
}
}
启动之后进行测试
客户端流式
定义
在客户端流式模式中,客户端通过流式方式发送多个请求给服务器,然后等待服务器返回一个响应
使用场景
- 当客户端需要向服务器发送一系列请求,并等待服务器返回结果时,例如逐步上传文件,每次发送一个数据块。
- 当客户端需要发送大量的数据给服务器,但服务器只返回一个结果,例如批量处理数据。
代码实现
定义proto
代码语言:javascript复制syntax = "proto3";
package com.lglbc.hello;
service OpenAPI {
rpc ClientStreaming (stream Request) returns (Response);
}
message Request {
string name = 1;
}
message Response {
string message = 1;
}
实现服务端接口
代码语言:javascript复制public class Service extends OpenAPIGrpc.OpenAPIImplBase {
@Override
public StreamObserver<Mode.Request> clientStreaming(StreamObserver<Mode.Response> responseObserver) {
return new StreamObserver<Mode.Request>() {
@Override
public void onNext(Mode.Request request) {
// 处理接收到的请求
System.out.println("收到客户端请求:" request.getName());
}
@Override
public void onError(Throwable t) {
// 处理错误
System.out.println("错误信息:" t.getMessage());
}
@Override
public void onCompleted() {
// 发送响应
Mode.Response response = Mode.Response.newBuilder()
.setMessage("消息处理完成")
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
}
}
客户端实现
代码语言:javascript复制public class Client {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
OpenAPIGrpc.OpenAPIStub stub = OpenAPIGrpc.newStub(channel);
StreamObserver<Mode.Response> responseObserver = new StreamObserver<Mode.Response>() {
@Override
public void onNext(Mode.Response response) {
System.out.println("Received response: " response.getMessage());
}
@Override
public void onError(Throwable t) {
System.err.println("Error: " t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Server completed.");
}
};
StreamObserver<Mode.Request> requestObserver = stub.clientStreaming(responseObserver);
for (int i = 0; i < 5; i ) {
Mode.Request request = Mode.Request.newBuilder()
.setName("Request " (i 1))
.build();
requestObserver.onNext(request);
}
// 结束请求流
requestObserver.onCompleted();
// 等待完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.shutdown();
}
}
服务端流式
定义
在服务器流式模式中,服务器通过流式方式发送多个响应消息给客户端,客户端等待服务器发送完所有响应后结束
使用场景
- 当服务器需要向客户端发送大量数据或实时数据时,例如服务器实时向客户端推送消息,流式传输文件。
- 当服务器需要返回多个相关的结果给客户端,例如搜索结果列表。
代码实现
定义proto
代码语言:javascript复制syntax = "proto3";
package com.lglbc.hello;
service OpenAPI {
rpc ServerStreaming (Request) returns (stream Response);
}
message Request {
string name = 1;
}
message Response {
string message = 1;
}
实现服务端接口
代码语言:javascript复制public class Service extends OpenAPIGrpc.OpenAPIImplBase {
@Override
public void serverStreaming(Mode.Request request, StreamObserver<Mode.Response> responseObserver) {
for (int i = 0; i < 10; i ) {
// 发送多个响应消息
Mode.Response response = Mode.Response.newBuilder()
.setMessage("服务端消息:" i)
.build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
}
}
实现客户端
代码语言:javascript复制public class Client {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
OpenAPIGrpc.OpenAPIBlockingStub stub = OpenAPIGrpc.newBlockingStub(channel);
Mode.Request request = Mode.Request.newBuilder()
.setName("乐哥聊编程")
.build();
stub.serverStreaming(request)
.forEachRemaining(response -> {
System.out.println("Received response: " response.getMessage());
});
channel.shutdown();
}
}
双向流式
定义
在双向流式模式中,客户端和服务器都可以通过流式方式同时发送和接收多个消息
使用场景
- 当客户端和服务器之间需要实时双向通信时,例如聊天应用、实时协作工具。
- 当客户端和服务器需要交换大量数据,而不仅限于一次请求-响应交互。
代码实现
定义proto
代码语言:javascript复制syntax = "proto3";
package com.lglbc.hello;
service OpenAPI {
rpc BidirectionalStreaming (stream Request) returns (stream Response);
}
message Request {
string name = 1;
}
message Response {
string message = 1;
}
实现服务端接口
代码语言:javascript复制public class Service extends OpenAPIGrpc.OpenAPIImplBase {
@Override
public StreamObserver<Mode.Request> bidirectionalStreaming(StreamObserver<Mode.Response> responseObserver) {
return new StreamObserver<Mode.Request>() {
@Override
public void onNext(Mode.Request request) {
// 处理接收到的请求
System.out.println("收到客户端请求:" request.getName());
// 发送多个响应消息
Mode.Response response = Mode.Response.newBuilder()
.setMessage("服务端消息:")
.build();
responseObserver.onNext(response);
}
@Override
public void onError(Throwable t) {
// 处理错误
System.out.println("错误信息:" t.getMessage());
}
@Override
public void onCompleted() {
// 发送响应
Mode.Response response = Mode.Response.newBuilder()
.setMessage("消息处理完成")
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
}
}
实现客户端
代码语言:javascript复制public class Client {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build();
OpenAPIGrpc.OpenAPIStub stub = OpenAPIGrpc.newStub(channel);
Mode.Request request = Mode.Request.newBuilder()
.setName("乐哥聊编程")
.build();
StreamObserver<Mode.Response> responseObserver = new StreamObserver<Mode.Response>() {
@Override
public void onNext(Mode.Response response) {
System.out.println("Received response: " response.getMessage());
}
@Override
public void onError(Throwable t) {
System.err.println("Error: " t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Server completed.");
}
};
StreamObserver<Mode.Request> requestObserver = stub.bidirectionalStreaming(responseObserver);
for (int i = 0; i < 5; i ) {
Mode.Request request2 = Mode.Request.newBuilder()
.setName("Request " (i 1))
.build();
requestObserver.onNext(request2);
}
// 结束请求流
requestObserver.onCompleted();
// 等待完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.shutdown();
}
}