构建流
gRPC 利用 HTTP/2
的双向通信特性实现了连续的消息交换
,实现了双向流
。
数据流是什么?
流数据有各种各样的场景用法
。一种是,当事件发生时,有一种方法可以不断发出描述事件的消息
。例如,当数据被添加到数据库中时,数据库希望将“数据添加”事件通知相关方。或者当股票价格发生变化时,证券交易所希望向订阅“价格变化”事件的所有服务公布新价格。
另一个是有一种以异步方式高效传输非常大的数据集的方法
。例如,假设您有一个返回一百万条记录的查询,其中每条记录对调用者都有值。能够在每条记录以流的方式进入时检查它,比等到所有100万条记录都收到后再批量处理它们要有效得多。或者,想象你有一个电视控制台,想要摄入一部电影5分钟的时间来处理。通过流消费和处理5分钟的片段意味着消费者可以在电影进入时持续观看,而不必等到整部电影下载后才能观看。
环境准备
- python 3.7
- pip 最新版, 可以用下面的命令更新
python -m pip install --upgrade pip
安装 gRPC
python -m pip install grpcio
这个还需要安装一个 gRPC tools
, Python 的 gRPC 工具包括协议缓冲编译器 protoc 和用于从 .proto服务定义。
python -m pip install grpcio-tools
为了学习,这个提供要给demo
代码语言:javascript复制git clone -b v1.33.1 https://github.com/grpc/grpc
# 国内可以访问这个
git clone git@gitee.com:chasays/grpc.git
clone 之前需要安装 protoc 和 grpc_python_plugin
- protoc 可以直接用
brew install protoc
, 如果是其他的类似,或者下载protoc,然后解压后放到环境里面 - grpc_python_plugin 这个在上面clone的工程里面,编译
make grpc_python_plugin
然后把编译好的文件copy到PATH里面即可
(base) ➜ grpc git:(master) ✗ make grpc_python_plugin
[HOSTCXX] Compiling src/compiler/cpp_generator.cc
[HOSTCXX] Compiling src/compiler/csharp_generator.cc
[HOSTCXX] Compiling src/compiler/objective_c_generator.cc
[HOSTCXX] Compiling src/compiler/python_generator.cc
[HOSTCXX] Compiling src/compiler/ruby_generator.cc
[AR] Creating /Users/admin/Documents/OpenSource/grpc/libs/opt/libgrpc_plugin_support.a
[HOSTCXX] Compiling src/compiler/python_plugin.cc
[HOSTLD] Linking /Users/admin/Documents/OpenSource/grpc/bins/opt/grpc_python_plugin
(base) ➜ sudo cp bins/opt/grpc_python_plugin /usr/local/bin/
先来看一个最简单的 helloworld
然后切换目录到cd grpc/examples/python/helloworld
执行run_codegen.sh
,即可生成helloworld_pb2.py
文件。
然后依次执行 greeter_server.py
和greeter_client.py
。 就可以看到输出
python greeter_server.py
# 再开一个,shell进程执行
python greeter_client.py
# 需要注意执行client的时候一定要用python2, 用py3需要修改下文件里面print这句
(base) ➜ helloworld git:(master) ✗ python2 greeter_client.py
Greeter client received: Hello, you!
这个里面有个 stub
,需要提一下, 网上看了下,这个解释是不错的。
写码的时候你会遇到一些外部依赖,比如在本机上写代码,可能会调用谷歌的API,来完成远程调用。而我在做测试的时候并不想真的发出这个请求,(贵,得不到想要的结果),因此我选择通过某种方式(Mockito)来进行模拟。Stub指的就是这种模拟,把服务端的依赖用本机来进行模拟
也可以用 Bloomrpc
导入 protoc文件,然后直接执行。注意用这个执行之前需要启动 server
。
Streaming
要定义一个服务,你需要在你的. proto 文件中指定一个命名的服务:
代码语言:javascript复制service RouteGuide {
// (Method definitions not shown)
}
然后在服务定义中定义 rpc 方法,指定它们的请求和响应类型。让你定义四种
服务方法,所有这些都在 RouteGuide 服务中使用:
- 一个简单的 RPC,其中客户端使用存根向服务器发送请求,并等待响应返回,就像
普通的函数调用一样
。
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
- 一种
响应流
RPC,其中客户端向服务器发送请求,并获得一个流来读取一系列消息
。客户端从返回的流中读取,直到没有更多的消息。正如您在示例中看到的,您通过将 stream 关键字放在 response 类型之前来指定 response-streaming 方法。
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
- 一种
请求流式
RPC,其中客户机写入一系列消息并将它们发送到服务器,同样使用提供的流。一旦客户端完成了消息的写入
,它就会等待服务器读取所有消息并返回响应。通过将 stream 关键字放在请求类型之前,可以指定请求流方法。
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
- 一种
双向流式
RPC,其中双方使用读写流发送一系列消息。这两个流独立运行
,因此客户端和服务器可以按照自己喜欢的顺序读写: 例如,服务器可以等待接收所有客户端消息后再写响应,或者可以交替读取消息然后写入消息,或者其他读写组合。保留了每个流中消息的顺序。通过将 stream 关键字放在请求和响应之前,可以指定这种类型的方法。
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
然后用 如下命令生成 python代码。
代码语言:javascript复制$ python -m grpc_tools.protoc -I../../protos --python_out=. --grpc_python_out=. ../../protos/route_guide.proto
代码语言:javascript复制构建server和client的代码略。
https://github.com/grpc/grpc/blob/v1.33.1/examples/python/route_guide/route_guide_server.py
https://github.com/grpc/grpc/blob/v1.33.1/examples/python/route_guide/route_guide_client.py
启动server, 然后 push data,就可以在response看到对应的消息。
用protobuf 实现序列化和反序列化
用python来举例吧,比如序列化就是request,用 SerializeToString
, 反序列化就用 FromString
。
request_serializers = {
('helloworld.Greeter', 'SayHello'): helloworld_pb2.HelloRequest.SerializeToString,
}
response_deserializers = {
('helloworld.Greeter', 'SayHello'): helloworld_pb2.HelloReply.FromString,
}