温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
Fayson的github: https://github.com/fayson/cdhproject
提示:代码块部分可以左右滑动查看噢
1.文档编写目的
在前面的文章Fayson介绍了一些关于SparkStreaming的示例《如何使用Spark Streaming读取HBase的数据并写入到HDFS》、《SparkStreaming读Kafka数据写HBase》和《SparkStreaming读Kafka数据写Kudu》以上文章均是非Kerberos环境下的讲解,本篇文章Fayson主要介绍如何使用Spark2Streaming访问Kerberos环境的Kafka并将接收到的Kafka数据写入Kudu,在介绍本篇文章前,你可能需要知道:《如何在CDH集群启用Kerberos》《如何通过Cloudera Manager为Kafka启用Kerberos及使用》
示例架构图如下:
示例详细流程图如下:
- 内容概述:
1.环境准备
2.SparkStreaming示例开发
3.示例运行
4.总结
- 测试环境:
1.CM5.14.3/CDH5.14.2
2.CDK2.2.0(Apache Kafka0.10.2)
3.SPARK2.2.0
4.操作系统版本为Redhat7.3
5.采用root用户进行操作
6.集群已启用Kerberos
2.环境准备
1.准备访问Kafka的Keytab文件,使用xst命令导出keytab文件
代码语言:javascript复制[root@cdh01 ~]# kadmin.local
Authenticating as principal hbase/admin@FAYSON.COM with password.
kadmin.local: xst -norandkey -k fayson.keytab fayson@FAYSON.COM
(可左右滑动)
使用klist命令检查导出的keytab文件是否正确
代码语言:javascript复制[root@cdh01 ~]# klist -ek fayson.keytab
(可左右滑动)
2.准备jaas.cof文件内容如下:
代码语言:javascript复制KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"
principal="fayson@FAYSON.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"
principal="fayson@FAYSON.COM";
};
(可左右滑动)
将fayson.keytab和jaas.conf文件拷贝至集群的所有节点统一的/data/disk1/0286-kafka-shell/conf目录下。
3.准备向Kerberos环境发送数据的脚本,关于脚本这里就不在过多的介绍前面很多文章都有介绍,具体可以参考Fayson的GitHub:
代码语言:javascript复制https://github.com/fayson/cdhproject/tree/master/kafkademo/0286-kafka-shell
(可左右滑动)
根据需要将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:
代码语言:javascript复制{
"occupation": "生产工作、运输工作和部分体力劳动者",
"address": "台东东二路16号-8-8",
"city": "长治",
"marriage": "1",
"sex": "1",
"name": "仲淑兰",
"mobile_phone_num": "13607268580",
"bank_name": "广州银行31",
"id": "510105197906185179",
"child_num": "1",
"fix_phone_num": "15004170180"
}
(可左右滑动)
4.登录CM进入SPARK2f服务的配置项将spark_kafka_version的kafka版本修改为0.10
3.SparkStreaming示例开发
1.使用maven创建scala语言的spark2demo工程,pom.xml依赖如下
代码语言:javascript复制<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.6.0-cdh5.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.6.0-cdh5.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0.cloudera2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0.cloudera2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0.cloudera2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0.cloudera2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
(可左右滑动)
2.在resources下创建0288.properties配置文件,内容如下:
代码语言:javascript复制kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092
kafka.topics=Kafka_kudu_topic
kudumaster.list=cdh01.fayson.com,cdh02.fayson.com,cdh03.fayson.com
(可左右滑动)
3.创建Kafka2Spark2Kudu.scala文件,内容如下:
代码语言:javascript复制package com.cloudera.streaming
import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.commons.lang.StringUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf}
import scala.collection.JavaConverters._
import scala.util.parsing.json.JSON
/**
* package: com.cloudera.streaming
* describe: Kerberos环境中Spark2Streaming 应用实时读取Kafka数据,解析后存入Kudu
* 使用spark2-submit的方式提交作业
spark2-submit --class com.cloudera.streaming.Kafka2Spark2Kudu
--master yarn
--deploy-mode client
--executor-memory 2g
--executor-cores 2
--driver-memory 2g
--num-executors 2
--queue default
--principal fayson@FAYSON.COM
--keytab /data/disk1/0286-kafka-shell/conf/fayson.keytab
--driver-java-options "-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf"
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf"
spark2-demo-1.0-SNAPSHOT.jar
* creat_user: Fayson
* email: htechinfo@163.com
* creat_date: 2018/06/18
* creat_time: 下午11:09
* 公众号:Hadoop实操
*/
object Kafka2Spark2Kudu {
Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别
var confPath: String = System.getProperty("user.dir") File.separator "conf/0288.properties"
/**
* 建表Schema定义
*/
val userInfoSchema = StructType(
// col name type nullable?
StructField("id", StringType , false) ::
StructField("name" , StringType, true ) ::
StructField("sex" , StringType, true ) ::
StructField("city" , StringType, true ) ::
StructField("occupation" , StringType, true ) ::
StructField("tel" , StringType, true ) ::
StructField("fixPhoneNum" , StringType, true ) ::
StructField("bankName" , StringType, true ) ::
StructField("address" , StringType, true ) ::
StructField("marriage" , StringType, true ) ::
StructField("childNum", StringType , true ) :: Nil
)
/**
* 定义一个UserInfo对象
*/
case class UserInfo (
id: String,
name: String,
sex: String,
city: String,
occupation: String,
tel: String,
fixPhoneNum: String,
bankName: String,
address: String,
marriage: String,
childNum: String
)
def main(args: Array[String]): Unit = {
//加载配置文件
val properties = new Properties()
val file = new File(confPath)
if(!file.exists()) {
System.out.println(Kafka2Spark2Kudu.getClass.getClassLoader.getResource("0288.properties"))
val in = Kafka2Spark2Kudu.getClass.getClassLoader.getResourceAsStream("0288.properties")
properties.load(in);
} else {
properties.load(new FileInputStream(confPath))
}
val brokers = properties.getProperty("kafka.brokers")
val topics = properties.getProperty("kafka.topics")
val kuduMaster = properties.getProperty("kudumaster.list")
println("kafka.brokers:" brokers)
println("kafka.topics:" topics)
println("kudu.master:" kuduMaster)
if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics) || StringUtils.isEmpty(kuduMaster)) {
println("未配置Kafka和KuduMaster信息")
System.exit(0)
}
val topicsSet = topics.split(",").toSet
val spark = SparkSession.builder().appName("Kafka2Spark2Kudu-kerberos").config(new SparkConf()).getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次
val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers
, "auto.offset.reset" -> "latest"
, "security.protocol" -> "SASL_PLAINTEXT"
, "sasl.kerberos.service.name" -> "kafka"
, "key.deserializer" -> classOf[StringDeserializer]
, "value.deserializer" -> classOf[StringDeserializer]
, "group.id" -> "testgroup"
)
val dStream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
//引入隐式
import spark.implicits._
val kuduContext = new KuduContext(kuduMaster, spark.sparkContext)
//判断表是否存在
if(!kuduContext.tableExists("user_info")) {
println("create Kudu Table :{user_info}")
val createTableOptions = new CreateTableOptions()
createTableOptions.addHashPartitions(List("id").asJava, 8).setNumReplicas(3)
kuduContext.createTable("user_info", userInfoSchema, Seq("id"), createTableOptions)
}
dStream.foreachRDD(rdd => {
//将rdd数据重新封装为Rdd[UserInfo]
val newrdd = rdd.map(line => {
val jsonObj = JSON.parseFull(line.value())
val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
new UserInfo(
map.get("id").get.asInstanceOf[String],
map.get("name").get.asInstanceOf[String],
map.get("sex").get.asInstanceOf[String],
map.get("city").get.asInstanceOf[String],
map.get("occupation").get.asInstanceOf[String],
map.get("mobile_phone_num").get.asInstanceOf[String],
map.get("fix_phone_num").get.asInstanceOf[String],
map.get("bank_name").get.asInstanceOf[String],
map.get("address").get.asInstanceOf[String],
map.get("marriage").get.asInstanceOf[String],
map.get("child_num").get.asInstanceOf[String]
)
})
//将RDD转换为DataFrame
val userinfoDF = spark.sqlContext.createDataFrame(newrdd)
kuduContext.upsertRows(userinfoDF, "user_info")
})
ssc.start()
ssc.awaitTermination()
}
}
(可左右滑动)
4.使用mvn命令编译工程,注意由于是scala工程编译时mvn命令要加scala:compile
代码语言:javascript复制mvn clean scala:compile package
(可左右滑动)
5.将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务
在conf目录下新增0288.properties配置文件,内容如下:
4.示例运行
1.使用spark2-submit命令向集群提交SparkStreaming作业
代码语言:javascript复制spark2-submit --class com.cloudera.streaming.Kafka2Spark2Kudu
--master yarn
--deploy-mode client
--executor-memory 2g
--executor-cores 2
--driver-memory 2g
--num-executors 2
--queue default
--principal fayson@FAYSON.COM
--keytab /data/disk1/0286-kafka-shell/conf/fayson.keytab
--driver-java-options "-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf"
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf"
spark2-demo-1.0-SNAPSHOT.jar
(可左右滑动)
通过CM查看作业是否提交成功
Spark2的UI界面
2.通过Kudu Master的管理界面可以看到user_info表已创建
点击Table Id列进入user_info表详情页,获取Impala的建表语句:
代码语言:javascript复制CREATE EXTERNAL TABLE `user_info` STORED AS KUDU
TBLPROPERTIES(
'kudu.table_name' = 'user_info',
'kudu.master_addresses' = 'cdh01.fayson.com:7051,cdh02.fayson.com:7051,cdh03.fayson.com:7051')
(可左右滑动)
3.运行脚本向Kafka的Kafka_kudu_topic生产消息
4.登录Hue在Impala中执行上面的建表语句
执行Select查询user_info表中数据,数据已成功入库
5.总结
1.本示例中SparkStreaming读取Kerberos环境的Kafka集群,使用的是spark-streaming-kafka0.10.0版本的依赖包,在Spark中提供两个的另外一个版本的为0.8.0版本,在选择依赖包时需要注意与Spark版本的兼容性问题,具体可以参考官网地址:
代码语言:javascript复制http://spark.apache.org/docs/2.2.0/streaming-kafka-integration.html
(可左右滑动)
2.在/opt/cloudera/parcels/SPARK2/lib/spark2/jars目录下需要检查下是否有其它版本的spark-streaming-kafka的依赖包,如果存在需要删除,否则会出现版本冲突问题。
3.在前面的文章Fayson也有介绍Java访问Kerberos环境的Kafka,需要使用到jaas.conf文件,这里的jaas.conf文件Fayson通过spark2-submit的方式指定,注意我们的jaas.conf文件及keytab需要在集群的所有节点存在,因为Driver和Executor是随机在集群的节点上启动的。
4.同样在scala代码中访问Kafka是也一样需要添加Kerberos相关的配置security.protocol和sasl.kerberos.service.name参数。
5.Spark2默认的kafka版本为0.9需要通过CM将默认的Kafka版本修改为0.10
GitHub地址如下:
https://github.com/fayson/cdhproject/blob/master/spark2demo/src/main/scala/com/cloudera/streaming/Kafka2Spark2Kudu.scala
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。
原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操