客快物流大数据项目(五十三):实时ETL模块开发准备

2022-03-02 08:50:51 浏览数 (1)

目录

实时ETL模块开发准备

一、编写配置文件

二、创建包结构

三、编写工具类加载配置文件

实时ETL模块开发准备

一、编写配置文件

  • 公共模块resources目录创建配置文件:config.properties
代码语言:javascript复制
# CDH-6.2.1
bigdata.host=node2
# HDFS
dfs.uri=hdfs://node2:8020
# Local FS
local.fs.uri=file://
# Kafka
kafka.broker.host=node2
kafka.broker.port=9092
kafka.init.topic=kafka-topics --zookeeper node2:2181/kafka --create --replication-factor 1 --partitions 1 --topic logistics
kafka.logistics.topic=logistics
kafka.crm.topic=crm
# ZooKeeper
zookeeper.host=node2.
zookeeper.port=2181
# Kudu
kudu.rpc.host=node2
kudu.rpc.port=7051
kudu.http.host=node2
kudu.http.port=8051
# ClickHouse
clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
clickhouse.url=jdbc:clickhouse://node2:8123/logistics?characterEncoding=utf-8&useSSL=false
clickhouse.user=root
clickhouse.password=123456
# ElasticSearch
elasticsearch.host=node2
elasticsearch.rpc.port=9300
elasticsearch.http.port=9200
# Azkaban
app.first.runnable=true
# Oracle JDBC
db.oracle.url="jdbc:oracle:thin:@//192.168.88.10:1521/ORCL"
db.oracle.user=root
db.oracle.password=123456
# MySQL JDBC
db.mysql.driver=com.mysql.jdbc.Driver
db.mysql.url=jdbc:mysql://192.168.88.10:3306/crm?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false
db.mysql.user=root
db.mysql.password=123456
## Data path of ETL program output ##
# Run in the yarn mode in Linux
spark.app.dfs.checkpoint.dir=/apps/logistics/dat-hdfs/spark-checkpoint
spark.app.dfs.data.dir=/apps/logistics/dat-hdfs/warehouse
spark.app.dfs.jars.dir=/apps/logistics/jars
# Run in the local mode in Linux
spark.app.local.checkpoint.dir=/apps/logistics/dat-local/spark-checkpoint
spark.app.local.data.dir=/apps/logistics/dat-local/warehouse
spark.app.local.jars.dir=/apps/logistics/jars
# Running in the local Mode in Windows
spark.app.win.checkpoint.dir=D://apps/logistics/dat-local/spark-checkpoint
spark.app.win.data.dir=D://apps/logistics/dat-local/warehouse
spark.app.win.jars.dir=D://apps/logistics/jars

二、​​​​​​​创建包结构

本次项目采用scala编程语言,因此创建scala目录

包名

说明

cn.it.logistics.etl.realtime

实时ETL程序所在包

cn.it.logistics.etl.parser

Canal和Ogg数据解析类所在包

三、编写工具类加载配置文件

实现步骤:

  • 公共模块scala目录下common包下创建 Configure  单例对象
  • 编写代码
    • 使用 ResourceBundle.getBundle 获取配置对象
    • 调用config.getString方法加载 config.properties 配置
    • 添加一个 main 方法测试,工具类是否能够正确读取出配置项

参考代码:

代码语言:javascript复制
package cn.it.logistics.common

import java.util.{Locale, ResourceBundle}

/**
 * 读取配置文件的工具类
 */
class Configuration {
  /**
   * 定义配置文件操作的对象
   */
  private val resourceBundle: ResourceBundle = ResourceBundle.getBundle("config", new Locale("zh", "CN"))
  private val sep = ":"

  // CDH-6.2.1
  val bigdataHost: String = resourceBundle.getString("bigdata.host")
  // HDFS
  val dfsUri: String = resourceBundle.getString("dfs.uri")
  // Local FS
  val localFsUri: String = resourceBundle.getString("local.fs.uri")
  // Kafka
  val kafkaBrokerHost: String = resourceBundle.getString("kafka.broker.host")
  val kafkaBrokerPort: Int = Integer.valueOf(resourceBundle.getString("kafka.broker.port"))
  val kafkaInitTopic: String = resourceBundle.getString("kafka.init.topic")
  val kafkaLogisticsTopic: String = resourceBundle.getString("kafka.logistics.topic")
  val kafkaCrmTopic: String = resourceBundle.getString("kafka.crm.topic")
  val kafkaAddress = kafkaBrokerHost sep kafkaBrokerPort
  // Spark
  val LOG_OFF = "OFF"
  val LOG_DEBUG = "DEBUG"
  val LOG_INFO = "INFO"
  val LOCAL_HADOOP_HOME = "E:\softs\hadoop-3.0.0"
  val SPARK_KAFKA_FORMAT = "kafka"
  val SPARK_KUDU_FORMAT = "kudu"
  val SPARK_ES_FORMAT = "es"
  val SPARK_CLICKHOUSE_FORMAT = "clickhouse"
  // ZooKeeper
  val zookeeperHost: String = resourceBundle.getString("zookeeper.host")
  val zookeeperPort: Int = Integer.valueOf(resourceBundle.getString("zookeeper.port"))
  // Kudu
  val kuduRpcHost: String = resourceBundle.getString("kudu.rpc.host")
  val kuduRpcPort: Int = Integer.valueOf(resourceBundle.getString("kudu.rpc.port"))
  val kuduHttpHost: String = resourceBundle.getString("kudu.http.host")
  val kuduHttpPort: Int = Integer.valueOf(resourceBundle.getString("kudu.http.port"))
  val kuduRpcAddress = kuduRpcHost sep kuduRpcPort
  // ClickHouse
  val clickhouseDriver: String = resourceBundle.getString("clickhouse.driver")
  val clickhouseUrl: String = resourceBundle.getString("clickhouse.url")
  val clickhouseUser: String = resourceBundle.getString("clickhouse.user")
  val clickhousePassword: String = resourceBundle.getString("clickhouse.password")
  // ElasticSearch
  val elasticsearchHost: String = resourceBundle.getString("elasticsearch.host")
  val elasticsearchRpcPort: Int = Integer.valueOf(resourceBundle.getString("elasticsearch.rpc.port"))
  val elasticsearchHttpPort: Int = Integer.valueOf(resourceBundle.getString("elasticsearch.http.port"))
  val elasticsearchAddress = elasticsearchHost sep elasticsearchHttpPort
  // Azkaban
  val isFirstRunnable = java.lang.Boolean.valueOf(resourceBundle.getString("app.first.runnable"))
  // ## Data path of ETL program output ##
  // # Run in the yarn mode in Linux
  val sparkAppDfsCheckpointDir = resourceBundle.getString("spark.app.dfs.checkpoint.dir")// /apps/logistics/dat-hdfs/spark-checkpoint
  val sparkAppDfsDataDir = resourceBundle.getString("spark.app.dfs.data.dir")// /apps/logistics/dat-hdfs/warehouse
  val sparkAppDfsJarsDir = resourceBundle.getString("spark.app.dfs.jars.dir")// /apps/logistics/jars
  // # Run in the local mode in Linux
  val sparkAppLocalCheckpointDir = resourceBundle.getString("spark.app.local.checkpoint.dir")// /apps/logistics/dat-local/spark-checkpoint
  val sparkAppLocalDataDir = resourceBundle.getString("spark.app.local.data.dir")// /apps/logistics/dat-local/warehouse
  val sparkAppLocalJarsDir = resourceBundle.getString("spark.app.local.jars.dir")// /apps/logistics/jars
  // # Running in the local Mode in Windows
  val sparkAppWinCheckpointDir = resourceBundle.getString("spark.app.win.checkpoint.dir")// D://apps/logistics/dat-local/spark-checkpoint
  val sparkAppWinDataDir = resourceBundle.getString("spark.app.win.data.dir")// D://apps/logistics/dat-local/warehouse
  val sparkAppWinJarsDir = resourceBundle.getString("spark.app.win.jars.dir")// D://apps/logistics/jars

  val dbOracleUrl = resourceBundle.getString("db.oracle.url")
  val dbOracleUser = resourceBundle.getString("db.oracle.user")
  val dbOraclePassword = resourceBundle.getString("db.oracle.password")
  val dbMySQLDriver = resourceBundle.getString("db.mysql.driver")
  val dbMySQLUrl = resourceBundle.getString("db.mysql.url")
  val dbMySQLUser = resourceBundle.getString("db.mysql.user")
  val dbMySQLPassword = resourceBundle.getString("db.mysql.password")
}
object Configuration extends Configuration {
  def main(args: Array[String]): Unit = {
    println(Configuration.dbOracleUrl)
    println(Configuration.dbMySQLDriver)
    println(Configuration.dbMySQLUrl)
    println(Configuration.dbMySQLPassword)
  }
}

0 人点赞