目录
将消费的kafka数据转换成bean对象
一、将OGG数据转换成bean对象
二、将Canal数据转换成bean对象
三、完整代码
将消费的kafka数据转换成bean对象
一、将OGG数据转换成bean对象
实现步骤:
- 消费kafka的 logistics Topic数据
- 将消费到的数据转换成OggMessageBean对象
- 递交作业启动运行
实现过程:
- 消费kafka的 logistics Topic数据
//2.1:获取物流系统相关的数据
val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
- 将消费到的数据转换成OggMessageBean对象
- 默认情况下表名带有数据库名,因此需要删除掉数据库名
//3.1:物流相关数据的转换
val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
iters.map(row => {
//获取到value列的值(字符串)
val jsonStr: String = row.getAs[String](0)
//将字符串转换成javabean对象
JSON.parseObject(jsonStr, classOf[OggMessageBean])
}).toList.iterator
})(Encoders.bean(classOf[OggMessageBean]))
- 递交作业启动运行
// 设置Streaming应用输出及启动
logisticsDF.writeStream.outputMode(OutputMode.Update())
.format("console").queryName("logistics").start()
二、将Canal数据转换成bean对象
实现步骤:
- 消费kafka的 crm Topic数据
- 将消费到的数据转换成 CanalMessageBean 对象
- 递交作业启动运行
实现过程:
- 消费kafka的 crm Topic数据
//2.2:获取客户关系系统相关的数据
val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
- 将消费到的数据转换成CanalMessageBean 对象
//3.2:客户关系相关数据的转换
val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
//canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
iters.filter(row=>{
//取到value列的数据
val line: String = row.getAs[String](0)
//如果value列的值不为空,且是清空表的操作
if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
}).map(row=>{
//取到value列的数据
val jsonStr: String = row.getAs[String](0)
//将json字符串转换成javaBean对象
JSON.parseObject(jsonStr, classOf[CanalMessageBean])
}).toList.toIterator
})(Encoders.bean(classOf[CanalMessageBean]))
- 递交作业启动运行
crmDF.writeStream.outputMode(OutputMode.Update())
.format("console").queryName("crm").start()
三、完整代码
代码语言:javascript复制package cn.it.logistics.etl.realtime
import java.sql.Connection
import cn.it.logistics.common.{Configuration, SparkUtils, TableMapping, Tools}
import cn.it.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import cn.it.logistics.etl.parser.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
/**
* 实现KUDU数据库的实时ETL操作
*/
object KuduStreamApp2 extends StreamApp {
/**
* 入口方法
* @param args
*/
def main(args: Array[String]): Unit = {
//创建sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(this.getClass.getSimpleName)
)
//数据处理
execute(sparkConf)
}
/**
* 数据的处理
*
* @param sparkConf
*/
override def execute(sparkConf: SparkConf): Unit = {
/**
* 实现步骤:
* 1)创建sparksession对象
* 2)获取数据源(获取物流相关数据以及crm相关数据)
* 3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
* 4)抽取每条数据的字段信息
* 5)将过滤出来的每张表写入到kudu数据库
*/
//1)创建sparksession对象
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//2)获取数据源(获取物流相关数据以及crm相关数据)
//2.1:获取物流系统相关的数据
val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
//2.2:获取客户关系系统相关的数据
val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
//导入隐式转换
import sparkSession.implicits._
//导入自定义的POJO的隐士转换
import cn.itcast.logistics.common.BeanImplicit._
//3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
//3.1:物流相关数据的转换
val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
iters.map(row => {
//获取到value列的值(字符串)
val jsonStr: String = row.getAs[String](0)
//将字符串转换成javabean对象
JSON.parseObject(jsonStr, classOf[OggMessageBean])
}).toList.iterator
})(Encoders.bean(classOf[OggMessageBean]))
//3.2:客户关系相关数据的转换
val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
//canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
iters.filter(row=>{
//取到value列的数据
val line: String = row.getAs[String](0)
//如果value列的值不为空,且是清空表的操作
if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
}).map(row=>{
//取到value列的数据
val jsonStr: String = row.getAs[String](0)
//将json字符串转换成javaBean对象
JSON.parseObject(jsonStr, classOf[CanalMessageBean])
}).toList.toIterator
})(Encoders.bean(classOf[CanalMessageBean]))
//输出数据
/**
* -------------------- -------------------- -------------------- -------------------- ------- -------------------- -------------------
* | after| before| current_ts| op_ts|op_type| pos| table|
* -------------------- -------------------- -------------------- -------------------- ------- -------------------- -------------------
* |[eid -> [], cdt -...|[eid -> [], cdt -...|2020-10-10T02:35:...|2020-10-10 02:35:...| U|00000000200006647808|tbl_collect_package|
* -------------------- -------------------- -------------------- -------------------- ------- -------------------- -------------------
*/
logsticsMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("logistics").start()
/**
* -------------------- -------- ----- ------------- --- -------------------- ------------------ --- -------------------- ----------- ------------- ------
* | data|database| ddl| es| id| mysqlType| old|sql| sqlType| table| ts| type|
* -------------------- -------- ----- ------------- --- -------------------- ------------------ --- -------------------- ----------- ------------- ------
* |[[cdt -> [], gis_...| crm|false|1602297244000| 18|[cdt -> [], gis_a...|[ {"gis_addr":"1"}]| |[cdt -> [], gis_a...|tbl_address|1602297244211|UPDATE|
* -------------------- -------- ----- ------------- --- -------------------- ------------------ --- -------------------- ----------- ------------- ------
*/
crmMessageBean.writeStream.outputMode(OutputMode.Update()).format("console").queryName("crm").start()
//8)启动运行等待停止
val stream = sparkSession.streams
//stream.active:获取当前活动流式查询的列表
stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
//线程阻塞,等待终止
stream.awaitAnyTermination()
}
/**
* 数据的保存
* @param dataFrame
* @param tableName
* @param isAutoCreateTable
*/
override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
}