如何在Kerberos环境下使用Flume采集Kafka数据写入HBase

2018-07-12 15:21:50 浏览数 (1)

在前面的文章Fayson也介绍了一些关于Flume的文章《非Kerberos环境下Kafka数据到Flume进Hive表》、《如何使用Flume准实时建立Solr的全文索引》、《如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS》、《如何使用Flume采集Kafka数据写入Kudu》和《如何使用Flume采集Kafka数据写入HBase》。本篇文章Fayson主要介绍在Kerberos的CDH集群中使用Flume采集Kafka数据写入HBase。

  • 内容概述

1.环境准备

2.配置Flume Agent

3.流程测试

4.总结

  • 测试环境

1.CM和CDH版本为5.12.1

2.采用root用户操作

  • 前置条件

1.Flume已安装

2.HBase和Kafka已安装且已启用Kerberos

3.集群已启用Kerberos

2.环境准备


1.编写向Kafka生成数据的ReadUserInfoFileToKafka.java代码,具体内容可以在Fayson的GitHub上查看

脚本目录说明:

conf:该目录下是kafka、krb5.conf及keytab配置文件,如下目录的配置文件不需要修改名称,只需要修改相应的内容即可

jaas.conf文件内容:

代码语言:javascript复制
[root@cdh01 0285-kafka-shell]# vim conf/jaas.conf 
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/root/0285-kafka-shell/conf/fayson.keytab"
  principal="fayson@FAYSON.COM";
};

(可左右滑动)

fayson.keytab文件生成,登录KDC和Kadmin所在服务器执行如下命令生成keytab文件

代码语言:javascript复制
[root@cdh01 ~]# kadmin.local 
Authenticating as principal hbase/admin@FAYSON.COM with password.
kadmin.local:  xst -norandkey -k fayson.keytab fayson@FAYSON.COM

(可左右滑动)

使用klist查看导出的keytab文件是否正确

代码语言:javascript复制
[root@cdh01 ~]# klist -el fayson.keytab

(可左右滑动)

lib:向Kafka生产消息的依赖包, kafka-demo-1.0-SNAPSHOT.jar为自己开发的向Kerberos环境发送消息的示例程序

ods_user_600.txt:测试数据文件,共600条测试数据

run.sh:运行脚本

代码语言:javascript复制
#!/bin/bash
#########################################
# 创建Topic
# kafka-topics --create --zookeeper cdh01.fayson.com:2181,cdh02.fayson.com:2181,cdh03.fayson.com:2181 --replication-factor 3 --partitions 3 --topic kafka_sparkstreaming_hbase_topic
#
########################################
JAVA_HOME=/usr/java/jdk1.8.0_144
#要读取的文件
read_file=$1
for file in `ls lib/*jar`
do
    CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH
${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.kerberos.ReadUserInfoFileToKafka $read_file

(可左右滑动)

以上脚本根据自己的环境修改相应配置即可,具体脚本可以查看Fayson的GitHub:

https://github.com/fayson/cdhproject/tree/master/kafkademo/0285-kafka-shell

2.通过hbase shell命令创建HBase测试表,并为fayson用户授权该表的读写

代码语言:javascript复制
create 'fayson_ods_deal_daily','info'
grant 'fayson','RWXCA','fayson_ods_deal_daily'

(可左右滑动)

注:由于HBase启用了Kerberos,所以我们这里在建表的同时需要为该表赋予给fayson用户,启动hbase shell命令需要使用hbase用户进行kinit操作。

3.配置Flume Agent


1.准备Flume使用的jaas.conf文件内容如下:

代码语言:javascript复制
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/opt/cloudera/parcels/flume-kerberos/fayson.keytab"
  principal="fayson@FAYSON.COM";
};
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/opt/cloudera/parcels/flume-kerberos/fayson.keytab"
  principal="fayson@FAYSON.COM";
};

(可左右滑动)

2.在CDH集群的所有节点的/opt/cloudera/parcels目录下创建flume-kerbers目录

代码语言:javascript复制
[root@cdh01 shell]# sh ssh_do_all.sh node.list "mkdir -p /opt/cloudera/parcels/flume-kerberos"

(可左右滑动)

将准备好的fayson.keytab和jaas.conf文件拷贝至集群所有节点的/opt/cloudera/parcels/flume-kerberos目录下

注:这里的jaas.conf文件是为Flume准备的配置文件,不要用到生产Kafka消息的文件。

为该目录下的文件授予755权限

代码语言:javascript复制
[root@cdh03 flume-kerberos]# chown -R flume. *
[root@cdh03 flume-kerberos]# chmod -R 755 *

(可左右滑动)

注:这里flume-kerberos及目录下的文件可以不用集群所有节点均存在,至少要保证Flume服务所在节点存在,目录下的文件权限需要调整否则会出现一些莫名其妙的异常。

3.登录CM,进flume服务界面,点击“配置”

4.在Agent类别的“配置文件”中输入如下内容:

代码语言:javascript复制
kafka.sources  = source1
kafka.channels = channel1
kafka.sinks = sink1
kafka.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
kafka.sources.source1.kafka.bootstrap.servers = cdh01.fayson.com:9092,cdh02.fayson.com:9092,cdh03.fayson.com:9092
kafka.sources.source1.kafka.topics = kafka_sparkstreaming_hbase_topic
kafka.sources.source1.kafka.consumer.group.id = flume-consumer
kafka.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
kafka.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
kafka.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka

kafka.sources.source1.channels = channel1
kafka.channels.channel1.type = memory
kafka.channels.channel1.capacity = 10000
kafka.channels.channel1.transactionCapacity = 1000
kafka.sinks.sink1.channel = channel1

kafka.sinks.sink1.type = com.cloudera.hbase.FaysonHBaseSink
kafka.sinks.sink1.table = fayson_ods_deal_daily
kafka.sinks.sink1.columnFamily = info
kafka.sinks.sink1.rowkeys = id,mobile_phone_num
kafka.sinks.sink1.serializer = com.cloudera.hbase.JsonHBaseEventSerializer
kafka.sinks.sink1.kerberosPrincipal = fayson@FAYSON.COM
kafka.sinks.sink1.kerberosKeytab = /opt/cloudera/parcels/CDH/lib/flume-ng/kerberos/fayson.keytab

(可左右滑动)

注:配置与Fayson前面讲的非Kerberos环境下有些不一样,增加了Kerberos的配置,这里的HBaseSink还是使用的Fayson自定义的Sink,具体可以参考前一篇文章《如何使用Flume采集Kafka数据写入HBase》

5.修改Flue Agent服务的启动参数

在Flume Agent的Java配置选项中增加如下配置:

代码语言:javascript复制
-Djava.security.auth.login.config=/opt/cloudera/parcels/flume-kerberos/jaas.conf

(可左右滑动)

6.保存flume配置,并重启Flume服务

4.流程测试


1.进入0285-kafka-shell目录执行命令向Kafka发送消息

代码语言:javascript复制
[root@cdh01 0285-kafka-shell]# sh run.sh ods_user_600.txt

(可左右滑动)

2.在命令行使用hbase shell查看fayson_ods_deal_daily表

代码语言:javascript复制
[root@cdh01 ~]# klist
[root@cdh01 ~]# hbase shell
hbase(main):001:0> list
hbase(main):002:0> scan 'fayson_ods_deal_daily'

(可左右滑动)

可以看到数据已写入到HBase的fayson_ods_deal_daily表,查看表总数与发送Kafka数量一致

5.总结


1.由于集群启用了Kerberos,向Kafka生成消息的应用脚本需要做相应的修改,在代码中增加Kerberos认证,具体示例代码这里Fayson未贴出来可以通过文章末尾的GitHub地址查阅。

2. Flume中使用的HBaseSink是Fayson前面一篇文章中将的自定义HBaseSink,可以指定HBase表的rowkey及支持Kerberos认证。

3.在配置Flume访问Kerberos环境的Kafka和HBase时需要为Flume的启动参数中增加jaas.conf指定Kerberos信息。

4.为Flume指定的jaas.conf和keytab文件要确保Flume用于有访问权限,佛则启动Flume时会报错。

5.由于HBase启用了Kerberos,所以我们在使用fayson用户向HBase表中写入数据时要先使用hbase用户启动hbase shell为fayson用于授予fayson_ods_deal_daily表的读写权限。

GitHub地址:

https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/kerberos/ReadUserInfoFileToKafka.java

https://github.com/fayson/cdhproject/tree/master/kafkademo/src/main/resources

自定义HBaseSink示例代码如下:

https://github.com/fayson/cdhproject/tree/master/flumesink/src/main/java/com/cloudera/hbase

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

0 人点赞