导语
Apache Pulsar是Yahoo开源的MQ解决方案,功能上跟Kafka、RocketMQ、TubeMQ等类似,同时支持多租户、读写分离、跨地域复制等特性。联邦学习作为新一代人工智能基础技术,通过解决数据隐私与数据孤岛问题,重塑金融、医疗、城市安防等领域。本文将介绍Pulsar在Angel PowerFL 联邦学习平台中的应用,探索MQ和联邦学习的跨界合作过程。
01
背 景
Angel PowerFL联邦学习平台及其通信模块要求
Angel PowerFL联邦学习平台构建在Angel之上,利用AngelPS支持万亿级模型训练的能力,将很多在Worker上的计算提升到PS端;Angel PowerFL为联邦学习算法提供了计算、加密、存储、状态同步等基本操作接口,通过流程调度模块协调参与方任务执行状态,而通信模块完成了任务训练 过程当中所有数据的传输。Angel PowerFL联邦学习已经在金融云、广告联合建模等业务中开始落地,并取得初步的效果。Angel PowerFL系统架构图如下:
Angel PowerFL的学习任务在训练过程当中,参与方之间会有大量的加密数据通过通信模块传输,Angel PowerFL对通信模块有以下要求:
- 稳定可靠 Angel PowerFL的学习任务时长从几分钟到几小时的都有,算法执行对数据的质量要求很高,不同算法的数据传输峰值也不一样,这需要通信模块的服务足够的稳定,并且不能丢数据。
- 高性能传输 Angel PowerFL底层通过Spark进行计算,Executor并发执行会产生很多待传输的中间数据,通信模块需要将这些加密后的数据及时传输到对方,这就要求通信服务延时低、吞吐量尽可能高。
- 数据安全 虽然Angel PowerFL所有数据都通过加密模块进行了加密,但参与联邦学习的业务可能分布在不同公司,跨公网进行传输,需要通信模块足够安全,不易被攻击。
Pulsar及Pulsar GeoReplication
Pulsar是新一代消息队列系统,架构上做了计算与存储的分离,MQ的逻辑主要放在Pulsar Broker完成,存储层使用Apache BookKeeper作为分布式一致性存储,相比于传统MQ的一些优势:
- Broker和Bookie相互独立,可以独立的扩展,独立的容错,提升系统的可用性
- 分区存储不受限于单个节点存储容量,数据分布更加均匀
- Bookkeeper提供可靠的存储,保证消息不丢失
相比较传统的MQ解决方案,针对跨越多个数据中心的多Pulsar集群,Pulsar提供了地域复制功能,即Pulsar GeoReplication。GeoReplication同时支持同步地域复制和异步地域复制,甚至可以在Message级别通过 setReplicationClusters控制消息复制到哪些集群。
在上图中,无论Producer P1、P2和P3在什么时候分别将消息发布给Cluster A、Cluster B和Cluster C中的topic T1,这些消息均会立刻复制到整个集群。一旦完成复制,Consumer C1和C2即可从自己所在的集群消费这些消息。
02
基于Pulsar的Angel PowerFL通信模块实现
参与联邦学习的各个业务(Angel PowerFL称之为Party,每个Party有不同的ID,比方说 10000/20000),可能分布在同个公司的不同部门(无网络隔离),也可能分布在不同公司(跨公网),各个Party之间通过Pulsar GeoReplication进行同步复制,总的设计方案如下:
联邦学习的每个训练任务,通过消息生产者(Prouder)和消费者(Consumer)连接所在Party的Pulsar集群,集群名以fl-pulsar-[partyID] 进行区分,训练任务产生需要传输的中间数据后,生产者负责将这些数据发送给本地Pulsar集群。Pulsar集群收到数据后,通过Pulsar Proxy建立的同步复制网络通道,将数据发送给使用方Party。而使用方Party的消费者,会一直监听该训练任务对应的Topic,当有数据达到后,直接消费数据进行下一步的计算。
Angel PowerFL支持多方联邦,意味着存在大于2个Pulsar集群开启了同步复制。每个联邦学习任务,通过各自的parties任务参数指定了参与方,生产者在发送消息时调用 setReplicationClusters接口,保证了数据只在参与party之间传输。
使用Pulsar作为Angel PowerFL的通信模块,在方案的实现的过程当中,使用了Pulsar很多其它特性,同时也做了些优化,包括GeoReplication去掉Global Zookeeper依赖、Client增加Token认证、多集群Topic自动回收等内容,下面将一一具体介绍。
GeoReplication去掉Global Zookeeper依赖
一个完整的Pulsar的部署,依赖两个ZooKeeper集群,分别是Local ZooKeeper和Global ZooKeeper。Local ZooKeeper和Kafka中的ZooKeeper作用类似,用来存储元数据。而 Global ZooKeeper则是用于Pulsar多集群,便于多个集群间的配置信息共享。
在Angel PowerFL的场景中,每个Party在加入前,都要先部署一个Global ZooKeeper的子节点,或者共用一套跨公司或跨地域的公共ZooKeeper,这样不仅会增加部署的难度,也会增加被攻击的风险,在推广上不利于说服新Party的加入。
Global ZooKeeper中存储的元数据,主要是集群名/服务地址/namespace权限等信息,并且Pulsar支持新集群的创建和加入。我们通过以下两个步骤注册联邦Pulsar集群的信息到Local ZooKeeper,就去除了对Global Zookeeper的依赖。
- 注册新加入Party Pulsar集群
./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} --url http://${OTHER_CLUSTER_HTTP_URL} --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}
- 授予namespace权限
./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}
对于新加入的Party,只用提供对应Pulsar的集群名/服务地址即可完成注册,GeoReplication就可以通过注册信息进行同步复制。
Client增加Token认证
Pulsar作为Angel PowerFL的通信模块,没有加入用户级别的权限控制,但为了进一步保证Client生产和消费数据的安全,参考Pulsar Token authentication(请复制链接到浏览器查看:https://pulsar.apache.org/docs/en/security-token-admin/)增加了Token认证,Angel PowerFL的训练任务除了配置当前party使用的服务地址外,还需要配置admin token。由于Angel PowerFL整套系统是有kubernetes部署的,我们通过容器生成Pulsar集群需要的Public/Private keys:
# fl-private.key and fl-public.keydocker run --rm -v "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 /pulsar/bin/pulsar tokens create-key-pair --output-private-key /tmp/fl-private.key --output-public-key /tmp/fl-public.key
# admin-token.txt token fileecho -n `docker run --rm -v "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 /pulsar/bin/pulsar tokens create --private-key file:///tmp/fl-private.key --subject admin` # prepare files for pulsar podskubectl create secret generic token-symmetric-key --from-file=TOKEN=admin-token.txt --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}
多集群Topic自动回收
由于Pulsar集群开启了GeoReplication功能后,无法通过命令直接删除用过的Topic,而Angel PowerFL训练任务每次使用过的task是一次性的,任务结束这些Topic就没用了,如果不及时删除就会出现大量累积。
Pulsar对于GeoReplication的Topic,可以通过 brokerDeleteInactiveTopicsEnabled配置开启 Topic自动回收,自动回收无用的Topic,需满足以下几个条件:
- 当前Topic没有连接的生产者或者消费者
- 当前Topic没有被订阅
- 当前Topic没有需要保留的Message
Angel PowerFL部署的Pulsar集群,通过 brokerDeleteInactiveTopicsEnabled开启了Topic自动回收,在训练任务执行过程当中,每个Topic在使用完后就按回收条件进行了处置。同时,我们增加
brokerDeleteInactiveTopicsFrequencySeconds配置将回收的频率设置为3小时。
开启Topic限流
Angel PowerFL中的训练任务,在不同的数据集/算法/执行阶段,生产数据的流量峰值也不同(生产环境目前观察到,单个任务最大的数据量超过200G/小时),如果训练过程中出现Pulsar断连或者生产消费的异常,整个训练任务都要重新跑。
为了让Pulsar集群不被单个训练任务冲垮,我们使用了Pulsar的限流功能。Pulsar支持 messagerate和byte-rate两种生产限流策略,前者是限制每秒生产message的条数,后者是限制每秒生产的message大小。由于Angel PowerFL会将数据切分成4M大小的Message,最好的选择是通过messagerate限制生产Message的条数。在Angel PowerFL中,我们将namespace的message限制为30条(小于<30*4=120M/s):
./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30
刚开始测试message-rate的限流功能时,出现了限不住的情况(限流设置失效),经过TDbank 团队负责Pulsar的同事一起排查,优化了限流功能并贡献到了社区(topic publish rate limit not take effect,请复制链接到浏览器查看详情:
https://github.com/apache/pulsar/issues/6975)。
优化Topic Unloading配置
Pulsar基于Broker集群负载状况,可以将Topic 动态分配到Broker上。如果拥有该Topic的Broker宕机,或者拥有该Topic的Broker负载过大,则该Topic将立即重新分配给另一个Broker ,而重新分配的过程就是Topic的Unloading,该操作意味着关闭Topic,释放所有者。
理论上,Topic Unloading是种正常的负载均衡调整,客户端将经历极小的延迟抖动,通常耗时 10ms左右。但Angel PowerFL初期版本在跑训练任务时,日志爆出大量因为Unloading Topic导致的连接异常,日志显示在不断的重试,但都不成功:
[sub] Could not get connection to broker: Topic is temporarily unavailabl e -- Will try again in 0.1 s
先来看Broker/Namespace/Bundle/Topic三者的关系。Bundle是Pulsar Namespace的一个分片机制,Namespace被分片为Bundles列表,每个Bundle包含Namespace的整个哈希范围的一部分。Topic不是直接分配给broker的,而是通过计算Topic的哈希码来确定把Topic分配给特定的Bundle,每个Bundle都是互相独立,再被分配到不同的Broker上。
Angel PowerFL早期版本的任务Topic没有复用,一个LR算法训练任务创建超过2000个Topic,并且每个topic生产的数据负载也不同,我们判断短时间大量创建topic并且负载不均衡导致频繁Topic Unloading。为了降低Topic Unloading的频率,对Pulsar的Bundle相关参数做了调整:
# 增加broker可分配最大Topic数量loadBalancerBrokerMaxTopics=500000# 启用自动拆分namespace bundleloadBalancerAutoBundleSplitEnabled=true# 增加触发拆分bundle的topic数量loadBalancerNamespaceBundleMaxTopics=10000# 增加触发拆分bundle的消息数loadBalancerNamespaceBundleMaxMsgRate=10000
同时,在创建namespace设置默认bundles数量为64:
./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64
经过以上调整,Angel PowerFL在任务执行期间没有再出现过由于Topic Unloading导致的断连情况。
Pulsar On Kubernetes
Angel PowerFL的所有服务均通过Helm部署在Kubernetes上,Pulsar作为其中的一个Chart,可以很好的利用k8s的资源隔离、快速扩缩容等特性。在Angel PowerFL使用Helm部署Pulsar的实践中,我们总结了以下几个经验:
- 使用Local Persistent Volume作为存储 Pulsar是IO敏感的的服务,尤其bookie组件,在生产环境中建议使用SSD或独立的磁盘。Angel PowerFL在跑一些大数据集任务时,Pulsar经常出现No Bookies Available的异常,而这期间磁盘的util_max很高。我们通过Local Persistent Volume将bookie和zookeeper等其它组件挂载到单独的磁盘,减缓了磁盘IO的竞争。我们也测试过将pulsar的PV存储换成Ceph和NFS,性能都没有直接使用Local Persistent Volume好。
- 使用NodeSelector GeoReplication在做同步复制期间,Broker需要能够访问对方的pulsar proxy容器。Angel PowerFL将网关机单独打了标签,通过NodeSelector将Broker在可访问外网的网关机上。
- 配置useHostNameAsBookieID bookie是有状态的组件,为了让bookie pod重建后服务正常,需要配置 useHostNameAsBookieID,以保证向zookeeper注册的ID是pod的hostname。
以上就是本文的全部内容了,大家如果遇到了相关的技术问题,欢迎在文章下方留言。
以“#你问我答# 提问内容”的形式留言提问,就有机会得到专家回复,还将获得腾讯视频VIP月卡一张哦!
扫码关注 | 即刻了解腾讯大数据技术动态