概述
为什么使用grpc
- 相对json是强类型的有scheme定义的
- 社区活跃cncf的基石项目,众多项目使用
- 性能是json的4-5倍,提供高效的进程间通信
- 多语言支持,各类语言都可以轻松使用
- 支持双工流,流式传输数据
- 内置的商业化特性(认证,加密,弹性,元数据交换,压缩,负载均衡,服务发现)
- 与云原生生态系统进行了集成(envoy,prometheus)
其他的选择
- http json
- gRPC:谷歌开源的高性能RPC框架。
- dubbo:阿里开源的RPC框架
- Finagle:twitter的RPC框架
- Thrift:Facebook的RPC框架
- Tars:腾讯的RPC框架
1 定义proto
代码语言:txt复制syntax = "proto3";
option java_package = "cn.beckbi.pb";
option java_outer_classname = "AdInfo";
message Ad {
int32 id = 1;
string name = 2;
string description = 3;
float price = 4;
}
message AdId {
int32 id = 1;
}
service AdRpc {
rpc addAd(Ad) returns (AdId);
rpc getAd(AdId) returns (Ad);
}
2 环境准备
生成代码
- protobuf:compile 生成pb对象
- protobuf:compile-custom 生成grpc代码
maven配置
- os-maven-plugin 监测系统的版本,生成不同的pb对象
- protobuf-maven-plugin 插件生成pb代码,生成grpc代码
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.49.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protobuf.version>3.12.0</protobuf.version>
<protoc.version>3.12.0</protoc.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<os-maven-plugin.version>1.6.1</os-maven-plugin.version>
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>${os-maven-plugin.version}</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.16.1:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>${project.basedir}/src/main/resources</protoSourceRoot>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3 java客户端
代码语言:txt复制public class AdClient {
private static final Logger logger = Logger.getLogger(AdClient.class.getName());
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11111)
.usePlaintext()
.build();
AdRpcGrpc.AdRpcBlockingStub stub = AdRpcGrpc.newBlockingStub(channel);
AdInfo.AdId adId = stub.addAd(AdInfo.Ad.newBuilder()
.setName("cpl1")
.setDescription("cpl ad")
.setPrice(31.21f)
.build());
logger.info("set ad " adId.getId() " add success !");
AdInfo.Ad ad = stub.getAd(adId);
logger.info("Ad: " ad.toString());
channel.shutdown();
}
}
4 java服务端
rpc的入口类
代码语言:txt复制package cn.beckbi.server;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
import java.util.logging.Logger;
/**
* @program: kgrpc
* @description:
* @author: bikang
* @create: 2022-08-28 08:02
*/
public class AdServer {
private static final Logger logger = Logger.getLogger(AdServer.class.getName());
private Server server;
private void start() throws IOException {
int port = 11111;
server = ServerBuilder.forPort(port)
.addService(new AdRpcImpl())
.build()
.start();
logger.info("Server started, listening on " port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("*** shutting down gRPC server since JVM is shutting down");
AdServer.this.stop();
logger.info("*** server shut down");
}));
}
private void stop() {
if (server != null) {
server.shutdown();
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final AdServer server = new AdServer();
server.start();
server.blockUntilShutdown();
}
}
rpc实现的接口
代码语言:txt复制package cn.beckbi.server;
import cn.beckbi.pb.AdInfo;
import cn.beckbi.pb.AdRpcGrpc;
import io.grpc.Status;
import io.grpc.StatusException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.logging.Logger;
/**
* @program: kgrpc
* @description:
* @author: bikang
* @create: 2022-08-28 08:02
*/
public class AdRpcImpl extends AdRpcGrpc.AdRpcImplBase {
private static final Logger logger = Logger.getLogger(AdRpcImpl.class.getName());
private final Random random = new Random();
private final Map<Integer, AdInfo.Ad> adInfoMap = new HashMap<>(50);
@Override
public void addAd(cn.beckbi.pb.AdInfo.Ad request,
io.grpc.stub.StreamObserver<cn.beckbi.pb.AdInfo.AdId> responseObserver) {
int id = random.nextInt(100000000);
logger.info("id:" id);
request = request.toBuilder().setId(id).build();
adInfoMap.put(id, request);
AdInfo.AdId adId = AdInfo.AdId.newBuilder().setId(id).build();
responseObserver.onNext(adId);
responseObserver.onCompleted();
}
@Override
public void getAd(cn.beckbi.pb.AdInfo.AdId request,
io.grpc.stub.StreamObserver<cn.beckbi.pb.AdInfo.Ad> responseObserver) {
int id = request.getId();
logger.info("id:" id);
if (adInfoMap.containsKey(id)) {
responseObserver.onNext((AdInfo.Ad) adInfoMap.get(id));
responseObserver.onCompleted();
}else {
responseObserver.onError(new StatusException(Status.NOT_FOUND));
}
}
}