客快物流大数据项目(六十):将消费的kafka数据转换成bean对象

2022-03-09 09:02:24 浏览数 (1)

目录

将消费的kafka数据转换成bean对象

一、将OGG数据转换成bean对象

二、​​​​​​​将Canal数据转换成bean对象

三、完整代码

将消费的kafka数据转换成bean对象

一、​​​​​​​将OGG数据转换成bean对象

实现步骤:

  • 消费kafka的 logistics Topic数据
  • 将消费到的数据转换成OggMessageBean对象
  • 递交作业启动运行

实现过程:

  • 消费kafka的 logistics Topic数据
代码语言:javascript复制
//2.1:获取物流系统相关的数据
val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
  • 将消费到的数据转换成OggMessageBean对象
    • 默认情况下表名带有数据库名,因此需要删除掉数据库名
代码语言:javascript复制
//3.1:物流相关数据的转换
val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
  iters.map(row => {
    //获取到value列的值(字符串)
    val jsonStr: String = row.getAs[String](0)
    //将字符串转换成javabean对象
    JSON.parseObject(jsonStr, classOf[OggMessageBean])
  }).toList.iterator
})(Encoders.bean(classOf[OggMessageBean]))
  • 递交作业启动运行
代码语言:javascript复制
// 设置Streaming应用输出及启动
logisticsDF.writeStream.outputMode(OutputMode.Update())
  .format("console").queryName("logistics").start()

二、​​​​​​​将Canal数据转换成bean对象

实现步骤:

  • 消费kafka的 crm Topic数据
  • 将消费到的数据转换成 CanalMessageBean 对象
  • 递交作业启动运行

实现过程:

  • 消费kafka的 crm Topic数据
代码语言:javascript复制
//2.2:获取客户关系系统相关的数据
val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
  • 将消费到的数据转换成CanalMessageBean 对象
代码语言:javascript复制
//3.2:客户关系相关数据的转换
val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
  //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
  iters.filter(row=>{
    //取到value列的数据
    val line: String = row.getAs[String](0)
    //如果value列的值不为空,且是清空表的操作
    if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
  }).map(row=>{
    //取到value列的数据
    val jsonStr: String = row.getAs[String](0)
    //将json字符串转换成javaBean对象
    JSON.parseObject(jsonStr, classOf[CanalMessageBean])
  }).toList.toIterator
})(Encoders.bean(classOf[CanalMessageBean]))
  • 递交作业启动运行
代码语言:javascript复制
crmDF.writeStream.outputMode(OutputMode.Update())
  .format("console").queryName("crm").start()

三、完整代码

代码语言:javascript复制
package cn.it.logistics.etl.realtime
import java.sql.Connection

import cn.it.logistics.common.{Configuration, SparkUtils, TableMapping, Tools}
import cn.it.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import cn.it.logistics.etl.parser.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}

/**
 * 实现KUDU数据库的实时ETL操作
 */
object KuduStreamApp2 extends StreamApp {

  /**
   * 入口方法
   * @param args
   */
  def main(args: Array[String]): Unit = {
    //创建sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(this.getClass.getSimpleName)
    )

    //数据处理
    execute(sparkConf)
  }

  /**
   * 数据的处理
   *
   * @param sparkConf
   */
  override def execute(sparkConf: SparkConf): Unit = {
    /**
     * 实现步骤:
     * 1)创建sparksession对象
     * 2)获取数据源(获取物流相关数据以及crm相关数据)
     * 3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
     * 4)抽取每条数据的字段信息
     * 5)将过滤出来的每张表写入到kudu数据库
     */
    //1)创建sparksession对象
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //2)获取数据源(获取物流相关数据以及crm相关数据)
    //2.1:获取物流系统相关的数据
    val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)

    //2.2:获取客户关系系统相关的数据
    val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)

    //导入隐式转换
    import  sparkSession.implicits._

    //导入自定义的POJO的隐士转换
    import  cn.itcast.logistics.common.BeanImplicit._

    //3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
    //3.1:物流相关数据的转换
    val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
      iters.map(row => {
        //获取到value列的值(字符串)
        val jsonStr: String = row.getAs[String](0)
        //将字符串转换成javabean对象
        JSON.parseObject(jsonStr, classOf[OggMessageBean])
      }).toList.iterator
    })(Encoders.bean(classOf[OggMessageBean]))

    //3.2:客户关系相关数据的转换
    val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
      //canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
      iters.filter(row=>{
        //取到value列的数据
        val line: String = row.getAs[String](0)
        //如果value列的值不为空,且是清空表的操作
        if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
      }).map(row=>{
        //取到value列的数据
        val jsonStr: String = row.getAs[String](0)
        //将json字符串转换成javaBean对象
        JSON.parseObject(jsonStr, classOf[CanalMessageBean])
      }).toList.toIterator
    })(Encoders.bean(classOf[CanalMessageBean]))

    //输出数据
    /**
     *  -------------------- -------------------- -------------------- -------------------- ------- -------------------- ------------------- 
     * |               after|              before|          current_ts|               op_ts|op_type|                 pos|              table|
     *  -------------------- -------------------- -------------------- -------------------- ------- -------------------- ------------------- 
     * |[eid -> [], cdt -...|[eid -> [], cdt -...|2020-10-10T02:35:...|2020-10-10 02:35:...|      U|00000000200006647808|tbl_collect_package|
     *  -------------------- -------------------- -------------------- -------------------- ------- -------------------- ------------------- 
     */
    logsticsMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()

    /**
     *  -------------------- -------- ----- ------------- --- -------------------- ------------------ --- -------------------- ----------- ------------- ------ 
     * |                data|database|  ddl|           es| id|           mysqlType|               old|sql|             sqlType|      table|           ts|  type|
     *  -------------------- -------- ----- ------------- --- -------------------- ------------------ --- -------------------- ----------- ------------- ------ 
     * |[[cdt -> [], gis_...|     crm|false|1602297244000| 18|[cdt -> [], gis_a...|[ {"gis_addr":"1"}]|   |[cdt -> [], gis_a...|tbl_address|1602297244211|UPDATE|
     *  -------------------- -------- ----- ------------- --- -------------------- ------------------ --- -------------------- ----------- ------------- ------ 
     */
    crmMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()

    //8)启动运行等待停止
    val stream = sparkSession.streams
    //stream.active:获取当前活动流式查询的列表
    stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
    //线程阻塞,等待终止
    stream.awaitAnyTermination()
  }

  /**
   * 数据的保存
   * @param dataFrame
   * @param tableName
   * @param isAutoCreateTable
   */
  override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
  }

0 人点赞