背景
使用grpc的stream流模式,让服务器具备推送消息的能力。
例子
简单例子,实现双向通信
proto 文件
使用stream
关键字
message CommandMessage {
required int32 type = 1;
optional string data = 2;
}
service CommandStreamService {
rpc CommandDispatch(stream CommandMessage) returns (stream CommandMessage){}
}
服务端
代码语言:java复制public class CommandStreamServerImpl extends CommandStreamServiceGrpc.CommandStreamServiceImplBase {
// 用来向客户端推送消息
private StreamObserver sendCmdObServer;
public io.grpc.stub.StreamObserver<com.zdpower.grpc.Hello.CommandMessage> commandDispatch(
io.grpc.stub.StreamObserver<com.zdpower.grpc.Hello.CommandMessage> responseObserver) {
// server => client
sendCmdObServer = responseObserver;
// client => server
return new StreamObserver<Hello.CommandMessage>() {
@Override
public void onNext(Hello.CommandMessage value) {
System.out.println("服务端:" value.getData());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
};
}
// contoller 测试用
public void sendUpdateCmd() {
sendCmdObServer.onNext(Hello.CommandMessage.newBuilder().setType(1).setData("update").build());
}
}
客户端
代码语言:java复制public class AgentRunApp {
private StreamObserver receivedEnd;
public static void main(String[] args) {
DefaultEventLoopGroup loopGroup = new DefaultEventLoopGroup();
try {
AgentRunApp app = new AgentRunApp();
app.run();
// 堵塞 netty
loopGroup.awaitTermination(30, TimeUnit.DAYS);
} catch (Exception e) {
e.printStackTrace();
}
}
public void run() throws Exception {
// 连接
ManagedChannel channel = ManagedChannelBuilder.forTarget("127.0.0.1:50054").usePlaintext().build();
CommandStreamServiceGrpc.CommandStreamServiceStub stub = CommandStreamServiceGrpc.newStub(channel);
// 客户端处理服务器消息 server=>client
receivedEnd = new StreamObserver<Hello.CommandMessage>() {
@Override
public void onNext(Hello.CommandMessage message) {
System.out.println("客户端收到消息:" message.getData());
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
}
};
// 模拟客户端发送消息
while (true){
// client => server
StreamObserver<Hello.CommandMessage> sendEnd = stub.commandDispatch(receivedEnd);
sendEnd.onNext(Hello.CommandMessage.newBuilder().setType(0).setData("client send").build());
sendEnd.onCompleted();
Thread.sleep(1000);
}
}
}