SparkStreaming读Kafka数据写HBase

2018-07-11 16:40:05 浏览数 (1)

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github: https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


在前面的文章Fayson介绍过《如何使用Spark Streaming读取HBase的数据并写入到HDFS》,关于SparkStreaming的应用场景很多,本篇文章Fayson主要介绍使用Scala语言开发一个SparkStreaming应用读取Kafka数据并写入HBase。本文的数据流图如下:

  • 内容概述

1.环境准备

2.编写SparkSteaming代码读取Kafka数据并写入HBase

3.流程测试

4.总结

  • 测试环境

1.CM和CDH版本为5.12.1

2.采用root用户操作

  • 前置条件

1.集群已安装Kafka

2.环境准备


1.编写向Kafka生成数据的ReadUserInfoFIleToKafka.java代码,具体内容可以在Fayson的GitHub上查看

代码语言:javascript复制
https://github.com/fayson/cdhproject/blob/master/kafkademo/src/main/java/com/cloudera/nokerberos/ReadUserInfoFIleToKafka.java
https://github.com/fayson/cdhproject/tree/master/kafkademo/0283-kafka-shell

(可左右滑动)

2.使用mvn命令将编写好的代码编译打包封装成脚本

代码语言:javascript复制
mvn clean package

(可左右滑动)

使用mvn命令将工程依赖包导出到lib目录

代码语言:javascript复制
mvn dependency:copy-dependencies -DoutputDirectory=/Users/fayson/Desktop/lib

(可左右滑动)

编写run.sh脚本

代码语言:javascript复制
#!/bin/bash
#########################################
# 创建Topic
# kafka-topics --create --zookeeper cdh01.fayson.com:2181,cdh02.fayson.com:2181,cdh03.fayson.com:2181 --replication-factor 3 --partitions 3 --topic kafka_sparkstreaming_hbase_topic
#
########################################
JAVA_HOME=/usr/java/jdk1.8.0_131
#要读取的文件
read_file=$1
for file in `ls lib/*jar`
do
    CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH
${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadUserInfoFIleToKafka $read_file

(可左右滑动)

准备测试数据ods_user_600.txt

封装好的脚本目录结构如下:

将打包好的jar包拷贝至lib目录下。

3.创建用于测试的Kafka Topic

代码语言:javascript复制
kafka-topics --create --zookeeper cdh01.fayson.com:2181,cdh02.fayson.com:2181,cdh03.fayson.com:2181 --replication-factor 3 --partitions 3 --topic kafka_sparkstreaming_hbase_topic

(可左右滑动)

4.创建HBase表,用于测试

代码语言:javascript复制
create 'user_info','info'

(可左右滑动)

5.通过CM配置SparkStreaming应用依赖包spark-streaming-kafka_2.10-1.6.0-cdh5.12.1.jar

将依赖包部署至CDH集群所有节点的/opt/cloudera/parcels/CDH/jars目录,然后通过CM配置Spark GateWay的spark-env.sh配置

代码语言:javascript复制
export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/opt/cloudera/parcels/CDH/jars/spark-streaming-kafka_2.10-1.6.0-cdh5.12.1.jar

(可左右滑动)

保存并重新部署客户端配置。

3.编写SparkStreaming应用


1.使用Maven创建Scala工程,工程依赖pom文件

代码语言:javascript复制
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.0-cdh5.13.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-assembly_2.10</artifactId>
    <version>1.6.0-cdh5.12.1</version>
</dependency>

(可左右滑动)

2.编写获取HBase连接的HBaseUtil工具类,内容如下:

代码语言:javascript复制
package utils
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
/**
  * package: utils
  * describe: HBase工具类
  * creat_user: Fayson 
  * email: htechinfo@163.com
  * creat_date: 2018/5/28
  * creat_time: 上午10:51
  * 公众号:Hadoop实操
  */
object HBaseUtil extends Serializable {
  /**
    * @param zkList Zookeeper列表已逗号隔开
    * @param port ZK端口号
    * @return
    */
  def getHBaseConn(zkList: String, port: String): Connection = {
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", zkList)
    conf.set("hbase.zookeeper.property.clientPort", port)
    val connection = ConnectionFactory.createConnection(conf)
    connection
  }
}

(可左右滑动)

3.编写Kafka2Spark2HBase.scala类,内容如下:

代码语言:javascript复制
package com.cloudera.streaming
import java.io.{File, FileInputStream, InputStreamReader}
import java.util.Properties
import kafka.serializer.StringDecoder
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import utils.HBaseUtil
import scala.util.Try
import scala.util.parsing.json.JSON
/**
  * package: com.cloudera.streaming
  * describe: SparkStreaming 应用实时读取Kafka数据,解析后存入HBase
  * 使用spark-submit的方式提交作业
    spark-submit --class com.cloudera.streaming.Kafka2Spark2HBase 
    --master yarn-client --num-executors 1 --driver-memory 1g 
    --driver-cores 1 --executor-memory 1g --executor-cores 1 
    spark-demo-1.0-SNAPSHOT.jar cdh04.fayson.com:9092,cdh02.fayson.com:9092,cdh03.fayson.com:9092 kafka_sparkstreaming_hbase_topic
  * creat_user: Fayson 
  * email: htechinfo@163.com
  * creat_date: 2018/5/28
  * creat_time: 上午10:09
  * 公众号:Hadoop实操
  */
object Kafka2Spark2HBase {
  var confPath: String = System.getProperty("user.dir")   File.separator   "conf/0283.properties"
  def main(args: Array[String]): Unit = {
    //加载配置文件
    val properties = new Properties()
    val file = new File(confPath)
    if(!file.exists()) {
      System.out.println(Kafka2Spark2HBase.getClass.getClassLoader.getResource("0283.properties"))
      val in = Kafka2Spark2HBase.getClass.getClassLoader.getResourceAsStream("0283.properties")
      properties.load(in);
    } else {
      properties.load(new FileInputStream(confPath))
    }
    val brokers = properties.getProperty("kafka.brokers")
    val topicsSet = properties.getProperty("kafka.topics").split(",").toSet
    val zkHost = properties.getProperty("zookeeper.list")
    val zkport = properties.getProperty("zookeeper.port")
    val sparkConf = new SparkConf().setAppName("Kafka2Spark2HBase")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5)) //设置Spark时间窗口,每5s处理一次
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    dStream.foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecords => {
        val connection = HBaseUtil.getHBaseConn(zkHost, zkport) // 获取Hbase连接
        partitionRecords.foreach(line => {
          //将Kafka的每一条消息解析为JSON格式数据
          println(line._2)
          val jsonObj =  JSON.parseFull(line._2)
          val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
          val rowkey = map.get("id").get.asInstanceOf[String]
          val name = map.get("name").get.asInstanceOf[String]
          val sex = map.get("sex").get.asInstanceOf[String]
          val city = map.get("city").get.asInstanceOf[String]
          val occupation = map.get("occupation").get.asInstanceOf[String]
          val mobile_phone_num = map.get("mobile_phone_num").get.asInstanceOf[String]
          val fix_phone_num = map.get("fix_phone_num").get.asInstanceOf[String]
          val bank_name = map.get("bank_name").get.asInstanceOf[String]
          val address = map.get("address").get.asInstanceOf[String]
          val marriage = map.get("marriage").get.asInstanceOf[String]
          val child_num = map.get("child_num").get.asInstanceOf[String]
          val tableName = TableName.valueOf("user_info")
          val table = connection.getTable(tableName)
          val put = new Put(Bytes.toBytes(rowkey))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes(sex))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("city"), Bytes.toBytes(city))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("occupation"), Bytes.toBytes(occupation))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mobile_phone_num"), Bytes.toBytes(mobile_phone_num))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("fix_phone_num"), Bytes.toBytes(fix_phone_num))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("bank_name"), Bytes.toBytes(bank_name))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes(address))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("marriage"), Bytes.toBytes(marriage))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("child_num"), Bytes.toBytes(child_num))
          Try(table.put(put)).getOrElse(table.close())//将数据写入HBase,若出错关闭table
          table.close()//分区数据写入HBase后关闭连接
        })
    connection.close()
      })
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

(可左右滑动)

4.使用mvn命令将编写好的SparkStreaming代码打包,注意由于工程中有scala代码在编译是命令中需要加scala:compile

代码语言:javascript复制
mvn clean scala:compile package

(可左右滑动)

4.流程测试


1.将编译好的SparkStreaming应用Jar包上传至有Spark Gateway节点的服务器上

conf/0283.properties内容如下:

2.使用spark-submit命令提交SparkStreaming作业

代码语言:javascript复制
spark-submit --class com.cloudera.streaming.Kafka2Spark2HBase 
--master yarn-client --num-executors 2 --driver-memory 1g 
--driver-cores 1 --executor-memory 1g --executor-cores 1 
spark-demo-1.0-SNAPSHOT.jar

(可左右滑动)

通过CM查看SparkStreaming作业是否正常运行

Yarn的8088界面查看

3.查看HBase中user_info表数据

4.运行脚本向Kafka生产数据

代码语言:javascript复制
[root@cdh01 0283-kafka-shell]# cd /root/0283-kafka-shell
[root@cdh01 0283-kafka-shell]# sh run.sh ods_user_600.txt

(可左右滑动)

5.通过Hue查看HBase的user_info表数据

Kafka的数据已成功的录入到HBase的user_info表中

HBase 命令行查看数据

5.总结


1.由于Spark中默认没有Spark-Streaming-Kafka的依赖包,需要将相应的依赖包添加到/opt/cloudera/parcels/CDH/jars目录下,然后在spark-env.sh中配置相应的依赖包路径,否则会报类找不到的异常。

2.在获取HBase的Connection后,完成数据入库后记得close掉,否则在应用运行一段时间后就无法获取的Zookeeper的连接,导致数据无法入库。

GitHub地址:

https://github.com/fayson/cdhproject/blob/master/sparkdemo/src/main/scala/com/cloudera/streaming/Kafka2Spark2HBase.scala

https://github.com/fayson/cdhproject/blob/master/sparkdemo/src/main/scala/com/cloudera/utils/HBaseUtil.scala

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

0 人点赞