目录
实时ETL模块开发准备
一、编写配置文件
二、创建包结构
三、编写工具类加载配置文件
实时ETL模块开发准备
一、编写配置文件
- 在公共模块的resources目录创建配置文件:config.properties
# CDH-6.2.1
bigdata.host=node2
# HDFS
dfs.uri=hdfs://node2:8020
# Local FS
local.fs.uri=file://
# Kafka
kafka.broker.host=node2
kafka.broker.port=9092
kafka.init.topic=kafka-topics --zookeeper node2:2181/kafka --create --replication-factor 1 --partitions 1 --topic logistics
kafka.logistics.topic=logistics
kafka.crm.topic=crm
# ZooKeeper
zookeeper.host=node2.
zookeeper.port=2181
# Kudu
kudu.rpc.host=node2
kudu.rpc.port=7051
kudu.http.host=node2
kudu.http.port=8051
# ClickHouse
clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
clickhouse.url=jdbc:clickhouse://node2:8123/logistics?characterEncoding=utf-8&useSSL=false
clickhouse.user=root
clickhouse.password=123456
# ElasticSearch
elasticsearch.host=node2
elasticsearch.rpc.port=9300
elasticsearch.http.port=9200
# Azkaban
app.first.runnable=true
# Oracle JDBC
db.oracle.url="jdbc:oracle:thin:@//192.168.88.10:1521/ORCL"
db.oracle.user=root
db.oracle.password=123456
# MySQL JDBC
db.mysql.driver=com.mysql.jdbc.Driver
db.mysql.url=jdbc:mysql://192.168.88.10:3306/crm?useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false
db.mysql.user=root
db.mysql.password=123456
## Data path of ETL program output ##
# Run in the yarn mode in Linux
spark.app.dfs.checkpoint.dir=/apps/logistics/dat-hdfs/spark-checkpoint
spark.app.dfs.data.dir=/apps/logistics/dat-hdfs/warehouse
spark.app.dfs.jars.dir=/apps/logistics/jars
# Run in the local mode in Linux
spark.app.local.checkpoint.dir=/apps/logistics/dat-local/spark-checkpoint
spark.app.local.data.dir=/apps/logistics/dat-local/warehouse
spark.app.local.jars.dir=/apps/logistics/jars
# Running in the local Mode in Windows
spark.app.win.checkpoint.dir=D://apps/logistics/dat-local/spark-checkpoint
spark.app.win.data.dir=D://apps/logistics/dat-local/warehouse
spark.app.win.jars.dir=D://apps/logistics/jars
二、创建包结构
本次项目采用scala编程语言,因此创建scala目录
包名 | 说明 |
---|---|
cn.it.logistics.etl.realtime | 实时ETL程序所在包 |
cn.it.logistics.etl.parser | Canal和Ogg数据解析类所在包 |
三、编写工具类加载配置文件
实现步骤:
- 在公共模块的scala目录下common包下创建 Configure 单例对象
- 编写代码
- 使用 ResourceBundle.getBundle 获取配置对象
- 调用config.getString方法加载 config.properties 配置
- 添加一个 main 方法测试,工具类是否能够正确读取出配置项
参考代码:
代码语言:javascript复制package cn.it.logistics.common
import java.util.{Locale, ResourceBundle}
/**
* 读取配置文件的工具类
*/
class Configuration {
/**
* 定义配置文件操作的对象
*/
private val resourceBundle: ResourceBundle = ResourceBundle.getBundle("config", new Locale("zh", "CN"))
private val sep = ":"
// CDH-6.2.1
val bigdataHost: String = resourceBundle.getString("bigdata.host")
// HDFS
val dfsUri: String = resourceBundle.getString("dfs.uri")
// Local FS
val localFsUri: String = resourceBundle.getString("local.fs.uri")
// Kafka
val kafkaBrokerHost: String = resourceBundle.getString("kafka.broker.host")
val kafkaBrokerPort: Int = Integer.valueOf(resourceBundle.getString("kafka.broker.port"))
val kafkaInitTopic: String = resourceBundle.getString("kafka.init.topic")
val kafkaLogisticsTopic: String = resourceBundle.getString("kafka.logistics.topic")
val kafkaCrmTopic: String = resourceBundle.getString("kafka.crm.topic")
val kafkaAddress = kafkaBrokerHost sep kafkaBrokerPort
// Spark
val LOG_OFF = "OFF"
val LOG_DEBUG = "DEBUG"
val LOG_INFO = "INFO"
val LOCAL_HADOOP_HOME = "E:\softs\hadoop-3.0.0"
val SPARK_KAFKA_FORMAT = "kafka"
val SPARK_KUDU_FORMAT = "kudu"
val SPARK_ES_FORMAT = "es"
val SPARK_CLICKHOUSE_FORMAT = "clickhouse"
// ZooKeeper
val zookeeperHost: String = resourceBundle.getString("zookeeper.host")
val zookeeperPort: Int = Integer.valueOf(resourceBundle.getString("zookeeper.port"))
// Kudu
val kuduRpcHost: String = resourceBundle.getString("kudu.rpc.host")
val kuduRpcPort: Int = Integer.valueOf(resourceBundle.getString("kudu.rpc.port"))
val kuduHttpHost: String = resourceBundle.getString("kudu.http.host")
val kuduHttpPort: Int = Integer.valueOf(resourceBundle.getString("kudu.http.port"))
val kuduRpcAddress = kuduRpcHost sep kuduRpcPort
// ClickHouse
val clickhouseDriver: String = resourceBundle.getString("clickhouse.driver")
val clickhouseUrl: String = resourceBundle.getString("clickhouse.url")
val clickhouseUser: String = resourceBundle.getString("clickhouse.user")
val clickhousePassword: String = resourceBundle.getString("clickhouse.password")
// ElasticSearch
val elasticsearchHost: String = resourceBundle.getString("elasticsearch.host")
val elasticsearchRpcPort: Int = Integer.valueOf(resourceBundle.getString("elasticsearch.rpc.port"))
val elasticsearchHttpPort: Int = Integer.valueOf(resourceBundle.getString("elasticsearch.http.port"))
val elasticsearchAddress = elasticsearchHost sep elasticsearchHttpPort
// Azkaban
val isFirstRunnable = java.lang.Boolean.valueOf(resourceBundle.getString("app.first.runnable"))
// ## Data path of ETL program output ##
// # Run in the yarn mode in Linux
val sparkAppDfsCheckpointDir = resourceBundle.getString("spark.app.dfs.checkpoint.dir")// /apps/logistics/dat-hdfs/spark-checkpoint
val sparkAppDfsDataDir = resourceBundle.getString("spark.app.dfs.data.dir")// /apps/logistics/dat-hdfs/warehouse
val sparkAppDfsJarsDir = resourceBundle.getString("spark.app.dfs.jars.dir")// /apps/logistics/jars
// # Run in the local mode in Linux
val sparkAppLocalCheckpointDir = resourceBundle.getString("spark.app.local.checkpoint.dir")// /apps/logistics/dat-local/spark-checkpoint
val sparkAppLocalDataDir = resourceBundle.getString("spark.app.local.data.dir")// /apps/logistics/dat-local/warehouse
val sparkAppLocalJarsDir = resourceBundle.getString("spark.app.local.jars.dir")// /apps/logistics/jars
// # Running in the local Mode in Windows
val sparkAppWinCheckpointDir = resourceBundle.getString("spark.app.win.checkpoint.dir")// D://apps/logistics/dat-local/spark-checkpoint
val sparkAppWinDataDir = resourceBundle.getString("spark.app.win.data.dir")// D://apps/logistics/dat-local/warehouse
val sparkAppWinJarsDir = resourceBundle.getString("spark.app.win.jars.dir")// D://apps/logistics/jars
val dbOracleUrl = resourceBundle.getString("db.oracle.url")
val dbOracleUser = resourceBundle.getString("db.oracle.user")
val dbOraclePassword = resourceBundle.getString("db.oracle.password")
val dbMySQLDriver = resourceBundle.getString("db.mysql.driver")
val dbMySQLUrl = resourceBundle.getString("db.mysql.url")
val dbMySQLUser = resourceBundle.getString("db.mysql.user")
val dbMySQLPassword = resourceBundle.getString("db.mysql.password")
}
object Configuration extends Configuration {
def main(args: Array[String]): Unit = {
println(Configuration.dbOracleUrl)
println(Configuration.dbMySQLDriver)
println(Configuration.dbMySQLUrl)
println(Configuration.dbMySQLPassword)
}
}