Java GRPC 双向通信

2022-06-21 23:50:52 浏览数 (3)

背景

使用grpc的stream流模式,让服务器具备推送消息的能力。

例子

简单例子,实现双向通信

proto 文件

使用stream 关键字

代码语言:txt复制
message CommandMessage {
  required int32  type  = 1;
  optional string data  = 2;
}

service CommandStreamService {
  rpc CommandDispatch(stream CommandMessage) returns (stream CommandMessage){}
}

服务端

代码语言:java复制
public class CommandStreamServerImpl extends CommandStreamServiceGrpc.CommandStreamServiceImplBase {
    // 用来向客户端推送消息
    private StreamObserver sendCmdObServer;

    public io.grpc.stub.StreamObserver<com.zdpower.grpc.Hello.CommandMessage> commandDispatch(
            io.grpc.stub.StreamObserver<com.zdpower.grpc.Hello.CommandMessage> responseObserver) {
        // server  => client
        sendCmdObServer = responseObserver;

        // client => server 
        return  new StreamObserver<Hello.CommandMessage>() {
            @Override
            public void onNext(Hello.CommandMessage value) {
                System.out.println("服务端:"   value.getData());
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onCompleted() {

            }
        };
    }

    // contoller 测试用
    public void sendUpdateCmd() {
        sendCmdObServer.onNext(Hello.CommandMessage.newBuilder().setType(1).setData("update").build());
    }
}

客户端

代码语言:java复制
public class AgentRunApp {
    private StreamObserver receivedEnd;

    public static void main(String[] args) {
        DefaultEventLoopGroup loopGroup = new DefaultEventLoopGroup();
        try {
            AgentRunApp app = new AgentRunApp();
            app.run();
            // 堵塞 netty
            loopGroup.awaitTermination(30, TimeUnit.DAYS);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public void run() throws Exception {
        // 连接
        ManagedChannel channel = ManagedChannelBuilder.forTarget("127.0.0.1:50054").usePlaintext().build();
        CommandStreamServiceGrpc.CommandStreamServiceStub stub = CommandStreamServiceGrpc.newStub(channel);
        // 客户端处理服务器消息  server=>client
        receivedEnd = new StreamObserver<Hello.CommandMessage>() {
            @Override
            public void onNext(Hello.CommandMessage message) {
                System.out.println("客户端收到消息:"   message.getData());

            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onCompleted() {
            }
        };

        // 模拟客户端发送消息
        while (true){
            // client => server
            StreamObserver<Hello.CommandMessage> sendEnd =  stub.commandDispatch(receivedEnd);
            sendEnd.onNext(Hello.CommandMessage.newBuilder().setType(0).setData("client send").build());
            sendEnd.onCompleted();
            Thread.sleep(1000);
        }
    }
}

0 人点赞