在前面的文章Fayson也介绍了一些关于Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》、《如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS》、《如何使用Flume采集Kafka数据写入Kudu》和《如何使用Flume采集Kafka数据写入HBase》。本篇文章Fayson主要介绍在Kerberos的CDH集群中使用Flume采集Kafka数据写入HBase。
- 内容概述
1.环境准备
2.配置Flume Agent
3.流程测试
4.总结
- 测试环境
1.CM和CDH版本为5.12.1
2.采用root用户操作
- 前置条件
1.Flume已安装
2.HBase和Kafka已安装且已启用Kerberos
3.集群已启用Kerberos
2.环境准备
1.编写向Kafka生成数据的ReadUserInfoFileToKafka.java代码,具体内容可以在Fayson的GitHub上查看
脚本目录说明:
conf:该目录下是kafka、krb5.conf及keytab配置文件,如下目录的配置文件不需要修改名称,只需要修改相应的内容即可
jaas.conf文件内容:
代码语言:javascript复制[root@cdh01 0285-kafka-shell]# vim conf/jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/root/0285-kafka-shell/conf/fayson.keytab"
principal="fayson@FAYSON.COM";
};
(可左右滑动)
fayson.keytab文件生成,登录KDC和Kadmin所在服务器执行如下命令生成keytab文件
代码语言:javascript复制[root@cdh01 ~]# kadmin.local
Authenticating as principal hbase/admin@FAYSON.COM with password.
kadmin.local: xst -norandkey -k fayson.keytab fayson@FAYSON.COM
(可左右滑动)
使用klist查看导出的keytab文件是否正确
代码语言:javascript复制[root@cdh01 ~]# klist -el fayson.keytab
(可左右滑动)
lib:向Kafka生产消息的依赖包, kafka-demo-1.0-SNAPSHOT.jar为自己开发的向Kerberos环境发送消息的示例程序
ods_user_600.txt:测试数据文件,共600条测试数据
run.sh:运行脚本
代码语言:javascript复制#!/bin/bash
#########################################
# 创建Topic
# kafka-topics --create --zookeeper cdh01.fayson.com:2181,cdh02.fayson.com:2181,cdh03.fayson.com:2181 --replication-factor 3 --partitions 3 --topic kafka_sparkstreaming_hbase_topic
#
########################################
JAVA_HOME=/usr/java/jdk1.8.0_144
#要读取的文件
read_file=$1
for file in `ls lib/*jar`
do
CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH
${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.kerberos.ReadUserInfoFileToKafka $read_file
(可左右滑动)
以上脚本根据自己的环境修改相应配置即可,具体脚本可以查看Fayson的GitHub:
https://github.com/fayson/cdhproject/tree/master/kafkademo/0285-kafka-shell
2.通过hbase shell命令创建HBase测试表,并为fayson用户授权该表的读写
代码语言:javascript复制create 'fayson_ods_deal_daily','info'
grant 'fayson','RWXCA','fayson_ods_deal_daily'
(可左右滑动)
注:由于HBase启用了Kerberos,所以我们这里在建表的同时需要为该表赋予给fayson用户,启动hbase shell命令需要使用hbase用户进行kinit操作。
3.配置Flume Agent
1.准备Flume使用的jaas.conf文件内容如下:
代码语言:javascript复制Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/opt/cloudera/parcels/flume-kerberos/fayson.keytab"
principal="fayson@FAYSON.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/opt/cloudera/parcels/flume-kerberos/fayson.keytab"
principal="fayson@FAYSON.COM";
};
(可左右滑动)
2.在CDH集群的所有节点的/opt/cloudera/parcels目录下创建flume-kerbers目录
代码语言:javascript复制[root@cdh01 shell]# sh ssh_do_all.sh node.list "mkdir -p /opt/cloudera/parcels/flume-kerberos"
(可左右滑动)
将准备好的fayson.keytab和jaas.conf文件拷贝至集群所有节点的/opt/cloudera/parcels/flume-kerberos目录下
注:这里的jaas.conf文件是为Flume准备的配置文件,不要用到生产Kafka消息的文件。
为该目录下的文件授予755权限
代码语言:javascript复制[root@cdh03 flume-kerberos]# chown -R flume. *
[root@cdh03 flume-kerberos]# chmod -R 755 *
(可左右滑动)
注:这里flume-kerberos及目录下的文件可以不用集群所有节点均存在,至少要保证Flume服务所在节点存在,目录下的文件权限需要调整否则会出现一些莫名其妙的异常。
3.登录CM,进flume服务界面,点击“配置”
4.在Agent类别的“配置文件”中输入如下内容:
代码语言:javascript复制kafka.sources = source1
kafka.channels = channel1
kafka.sinks = sink1
kafka.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
kafka.sources.source1.kafka.bootstrap.servers = cdh01.fayson.com:9092,cdh02.fayson.com:9092,cdh03.fayson.com:9092
kafka.sources.source1.kafka.topics = kafka_sparkstreaming_hbase_topic
kafka.sources.source1.kafka.consumer.group.id = flume-consumer
kafka.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
kafka.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
kafka.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
kafka.sources.source1.channels = channel1
kafka.channels.channel1.type = memory
kafka.channels.channel1.capacity = 10000
kafka.channels.channel1.transactionCapacity = 1000
kafka.sinks.sink1.channel = channel1
kafka.sinks.sink1.type = com.cloudera.hbase.FaysonHBaseSink
kafka.sinks.sink1.table = fayson_ods_deal_daily
kafka.sinks.sink1.columnFamily = info
kafka.sinks.sink1.rowkeys = id,mobile_phone_num
kafka.sinks.sink1.serializer = com.cloudera.hbase.JsonHBaseEventSerializer
kafka.sinks.sink1.kerberosPrincipal = fayson@FAYSON.COM
kafka.sinks.sink1.kerberosKeytab = /opt/cloudera/parcels/CDH/lib/flume-ng/kerberos/fayson.keytab
(可左右滑动)
注:配置与Fayson前面讲的非Kerberos环境下有些不一样,增加了Kerberos的配置,这里的HBaseSink还是使用的Fayson自定义的Sink,具体可以参考前一篇文章《如何使用Flume采集Kafka数据写入HBase》
5.修改Flue Agent服务的启动参数
在Flume Agent的Java配置选项中增加如下配置:
代码语言:javascript复制-Djava.security.auth.login.config=/opt/cloudera/parcels/flume-kerberos/jaas.conf
(可左右滑动)
6.保存flume配置,并重启Flume服务
4.流程测试
1.进入0285-kafka-shell目录执行命令向Kafka发送消息
代码语言:javascript复制[root@cdh01 0285-kafka-shell]# sh run.sh ods_user_600.txt
(可左右滑动)
2.在命令行使用hbase shell查看fayson_ods_deal_daily表
代码语言:javascript复制[root@cdh01 ~]# klist
[root@cdh01 ~]# hbase shell
hbase(main):001:0> list
hbase(main):002:0> scan 'fayson_ods_deal_daily'
(可左右滑动)
可以看到数据已写入到HBase的fayson_ods_deal_daily表,查看表总数与发送Kafka数量一致
5.总结
1.由于集群启用了Kerberos,向Kafka生成消息的应用脚本需要做相应的修改,在代码中增加Kerberos认证,具体示例代码这里Fayson未贴出来可以通过文章末尾的GitHub地址查阅。
2. Flume中使用的HBaseSink是Fayson前面一篇文章中将的自定义HBaseSink,可以指定HBase表的rowkey及支持Kerberos认证。
3.在配置Flume访问Kerberos环境的Kafka和HBase时需要为Flume的启动参数中增加jaas.conf指定Kerberos信息。
4.为Flume指定的jaas.conf和keytab文件要确保Flume用于有访问权限,佛则启动Flume时会报错。
5.由于HBase启用了Kerberos,所以我们在使用fayson用户向HBase表中写入数据时要先使用hbase用户启动hbase shell为fayson用于授予fayson_ods_deal_daily表的读写权限。
GitHub地址:
https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/kerberos/ReadUserInfoFileToKafka.java
https://github.com/fayson/cdhproject/tree/master/kafkademo/src/main/resources
自定义HBaseSink示例代码如下:
https://github.com/fayson/cdhproject/tree/master/flumesink/src/main/java/com/cloudera/hbase
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操