第6章 数据摄入
6.1 数据格式
- 摄入规范化数据:JSON、CSV、TSV
- 自定义格式
- 其他格式
6.2 配置
主要是摄入的规则ingestion Spec 摄入规则主要包含3个部分
代码语言:javascript复制{
"dataSchema" : {...},
"ioConfig" : {...},
"tuningConfig" : {...}
}
6.2.1 DataSchema
1、parser (1)string parser
parseSpec两个功能:
- String Parser用parseSpec判定将要处理rows的数据格式( JSON, CSV, TSV)
- 所有的Parsers 用parseSpec判定将要处理rows的 timestamp 和 dimensionsAll
format字段默认为tsv格式 JSON ParseSpec
CSV ParseSpec
TSV ParseSpec
TimestampSpec
DimensionsSpec
2. metricsSpec
一些简单的聚合函数: count 、longSum、longMin、longMax、doubleSum、doubleMin、doubleMax
3、GranularitySpec
代码语言:javascript复制"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"dimensionsSpec" : {
"dimensions" : [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name" : "commentLength", "type" : "long" },
{ "name" : "deltaBucket", "type" : "long" },
"flags",
"diffUrl",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"timestampSpec": {
"column": "timestamp",
"format": "iso"
}
}
},
"metricsSpec" : [],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "day",
"queryGranularity" : "none",
"intervals" : ["2016-06-27/2016-06-28"],
"rollup" : false
}
}
6.2.2 ioConfig
ioConfig 指明了真正具体的数据源
不同的firehose的格式不太一致,以kafka为例
代码语言:javascript复制{
firehose : {
consumerProps : {
auto.commit.enable : false
auto.offset.reset : largest
fetch.message.max.bytes : 1048586
group.id : druid-example
zookeeper.connect : localhost:2181
zookeeper.connect.timeout.ms : 15000
zookeeper.session.timeout.ms : 15000
zookeeper.sync.time.ms : 5000
},
feed : wikipedia
type : kafka-0.8
}
}
ioConfig 的案例:
代码语言:javascript复制"ioConfig" : {
"type" : "index",
"firehose" : {
"type" : "local",
"baseDir" : "quickstart/",
"filter" : "wikipedia-2016-06-27-sampled.json"
},
"appendToExisting" : false
}
6.2.3 tuningConfig
tuningConfig 这部分配置是优化数据输入的过程
代码语言:javascript复制"tuningConfig" : {
"type" : "index",
"targetPartitionSize" : 5000000,
"maxRowsInMemory" : 25000,
"forceExtendableShardSpecs" : true
}
6.3 从hadoop加载数据
6.3.1加载数据
批量摄取维基百科样本数据,文件位于quickstart/wikipedia-2016-06-27-sampled.json。使用quickstart/wikipedia-index-hadoop.json 摄取任务文件。
代码语言:javascript复制 bin/post-index-task --file quickstart/wikipedia-index-hadoop.json
此命令将启动Druid Hadoop摄取任务。 摄取任务完成后,数据将由历史节点加载,并可在一两分钟内进行查询。
6.3.2查询数据
6.4 从kafka加载数据
6.4.1 准备kafka
- 启动kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties
- 创建wikipedia主题
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 –topic
wikipedia --partitions 1 --replication-factor 1 –create
Created topic "wikipedia".
- 查看主题是否创建成功
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
__consumer_offsets
first
wikipedia
6.4.2 启动索引服务
我们将使用Druid的Kafka索引服务从我们新创建的维基百科主题中提取消息。要启动该服务,我们需要通过从Imply目录运行以下命令向
代码语言:javascript复制Druid的overlord提交supervisor spec
[atguigu@hadoop102 imply-2.7.10]$ curl -XPOST -H'Content-Type: application/json' -d
@quickstart/wikipedia-kafka-supervisor.json http://hadoop102:8090/druid/indexer/v1/supervisor
说明: curl是一个利用URL规则在命令行下工作的文件传输工具。它支持文件的上传和下载,所以是综合传输工具。
- -X 为 HTTP 数据包指定一个方法,比如 PUT、DELETE。默认的方法是 GET 6.4.3
- -H 为 HTTP 数据包指定 Header 字段内容
- -d 为 POST 数据包指定要向 HTTP 服务器发送的数据并发送出去,如果的内容以符号 @ 开头,其后的字符串将被解析为文件名,curl 命令会从这个文件中读取数据发送。
6.4.3 加载历史数据
启动kafka生产者生产数据
代码语言:javascript复制[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 –
-topic wikipedia < /opt/module/imply-2.7.10/quickstart/wikipedia-2016-06-27-sampled.json
说明: < 将文件作为命令输入 可在kafka本地看到相应的数据生成
代码语言:javascript复制[atguigu@hadoop103 logs]$ ll
drwxrwxr-x. 2 atguigu atguigu 4096 3月 30 11:16 wikipedia-0
[atguigu@hadoop103 logs]$ pwd
/opt/module/kafka/logs
将样本事件发布到Kafka的wikipedia主题,然后由Kafka索引服务将其提取到Druid中。你现在准备运行一些查询了!
6.4.4 加载实时数据
下载一个帮助应用程序,该应用程序将解析维基媒体的IRC提要中的event,并将这些event发布到我们之前设置的Kafka的wikipedia主题中。
代码语言:javascript复制[atguigu@hadoop102 imply-2.7.10]$ curl -O
https://static.imply.io/quickstart/wikiticker-0.4.tar.gz
说明: -O 在本地保存获取的数据时,使用她们在远程服务器上的文件名进行保存。
代码语言:javascript复制[atguigu@hadoop102 imply-2.7.10]$ tar -zxvf wikiticker-0.4.tar.gz
[atguigu@hadoop102 imply-2.7.10]$ cd wikiticker-0.4
现在运行带有参数的wikiticker,指示它将输出写入我们的Kafka主题:
[atguigu@hadoop102 wikiticker-0.4]$ bin/wikiticker -J-Dfile.encoding=UTF-8 -out kafka –
topic Wikipedia
查询多次,对比结果的变化
6.4.5 加载自定义kafka主题数据
可以通过编写自定义supervisor spec来加载自己的数据集。要自定义受监督的Kafka索引服务提取,您可以将包含的quickstart/wikipedia-kafka-supervisor.json 规范复制到自己的文件中,根据需要进行编辑,并根据需要创建或关闭管理程序。没有必要自己重启Imply或Druid服务。