快速学习-Druid数据摄入

2022-11-30 08:55:42 浏览数 (1)

第6章 数据摄入

6.1 数据格式

  1. 摄入规范化数据:JSON、CSV、TSV
  2. 自定义格式
  3. 其他格式

6.2 配置

主要是摄入的规则ingestion Spec 摄入规则主要包含3个部分

代码语言:javascript复制
{
  "dataSchema" : {...},
  "ioConfig" : {...},
  "tuningConfig" : {...}
}

6.2.1 DataSchema

1、parser (1)string parser

parseSpec两个功能:

  • String Parser用parseSpec判定将要处理rows的数据格式( JSON, CSV, TSV)
  • 所有的Parsers 用parseSpec判定将要处理rows的 timestamp 和 dimensionsAll

format字段默认为tsv格式 JSON ParseSpec

CSV ParseSpec

TSV ParseSpec

TimestampSpec

DimensionsSpec

2. metricsSpec

一些简单的聚合函数: count 、longSum、longMin、longMax、doubleSum、doubleMin、doubleMax

3、GranularitySpec

代码语言:javascript复制
"dataSchema" : {
  "dataSource" : "wikipedia",
  "parser" : {
    "type" : "string",
    "parseSpec" : {
      "format" : "json",
      "dimensionsSpec" : {
        "dimensions" : [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name" : "commentLength", "type" : "long" },
          { "name" : "deltaBucket", "type" : "long" },
          "flags",
          "diffUrl",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      },
      "timestampSpec": {
        "column": "timestamp",
        "format": "iso"
      }
    }
  },
  "metricsSpec" : [],
  "granularitySpec" : {
    "type" : "uniform",
    "segmentGranularity" : "day",
    "queryGranularity" : "none",
    "intervals" : ["2016-06-27/2016-06-28"],
    "rollup" : false
  }
}

6.2.2 ioConfig

ioConfig 指明了真正具体的数据源

不同的firehose的格式不太一致,以kafka为例

代码语言:javascript复制
{
    firehose : {
        consumerProps : {
            auto.commit.enable : false
            auto.offset.reset : largest
            fetch.message.max.bytes : 1048586
            group.id : druid-example
            zookeeper.connect : localhost:2181
                       zookeeper.connect.timeout.ms : 15000
                        zookeeper.session.timeout.ms : 15000
                        zookeeper.sync.time.ms : 5000
        },
        feed : wikipedia
        type : kafka-0.8
    }
}

ioConfig 的案例:

代码语言:javascript复制
"ioConfig" : {
      "type" : "index",
      "firehose" : {
        "type" : "local",
        "baseDir" : "quickstart/",
        "filter" : "wikipedia-2016-06-27-sampled.json"
      },
      "appendToExisting" : false
    }

6.2.3 tuningConfig

tuningConfig 这部分配置是优化数据输入的过程

代码语言:javascript复制
"tuningConfig" : {
      "type" : "index",
      "targetPartitionSize" : 5000000,
      "maxRowsInMemory" : 25000,
      "forceExtendableShardSpecs" : true
}

6.3 从hadoop加载数据

6.3.1加载数据

批量摄取维基百科样本数据,文件位于quickstart/wikipedia-2016-06-27-sampled.json。使用quickstart/wikipedia-index-hadoop.json 摄取任务文件。

代码语言:javascript复制
 bin/post-index-task --file quickstart/wikipedia-index-hadoop.json

此命令将启动Druid Hadoop摄取任务。 摄取任务完成后,数据将由历史节点加载,并可在一两分钟内进行查询。

6.3.2查询数据

6.4 从kafka加载数据

6.4.1 准备kafka

  1. 启动kafka
代码语言:javascript复制
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties
  1. 创建wikipedia主题
代码语言:javascript复制
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 –topic
wikipedia --partitions 1 --replication-factor 1 –create
Created topic "wikipedia".
  1. 查看主题是否创建成功
代码语言:javascript复制
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
__consumer_offsets
first
wikipedia

6.4.2 启动索引服务

我们将使用Druid的Kafka索引服务从我们新创建的维基百科主题中提取消息。要启动该服务,我们需要通过从Imply目录运行以下命令向

代码语言:javascript复制
Druid的overlord提交supervisor spec
[atguigu@hadoop102 imply-2.7.10]$ curl -XPOST -H'Content-Type: application/json' -d
 @quickstart/wikipedia-kafka-supervisor.json http://hadoop102:8090/druid/indexer/v1/supervisor

说明: curl是一个利用URL规则在命令行下工作的文件传输工具。它支持文件的上传和下载,所以是综合传输工具。

  • -X 为 HTTP 数据包指定一个方法,比如 PUT、DELETE。默认的方法是 GET 6.4.3
  • -H 为 HTTP 数据包指定 Header 字段内容
  • -d 为 POST 数据包指定要向 HTTP 服务器发送的数据并发送出去,如果的内容以符号 @ 开头,其后的字符串将被解析为文件名,curl 命令会从这个文件中读取数据发送。

6.4.3 加载历史数据

启动kafka生产者生产数据

代码语言:javascript复制
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 –
-topic wikipedia < /opt/module/imply-2.7.10/quickstart/wikipedia-2016-06-27-sampled.json

说明: < 将文件作为命令输入 可在kafka本地看到相应的数据生成

代码语言:javascript复制
[atguigu@hadoop103 logs]$ ll
drwxrwxr-x. 2 atguigu atguigu   4096 3月  30 11:16 wikipedia-0
 [atguigu@hadoop103 logs]$ pwd
/opt/module/kafka/logs

将样本事件发布到Kafka的wikipedia主题,然后由Kafka索引服务将其提取到Druid中。你现在准备运行一些查询了!

6.4.4 加载实时数据

下载一个帮助应用程序,该应用程序将解析维基媒体的IRC提要中的event,并将这些event发布到我们之前设置的Kafka的wikipedia主题中。

代码语言:javascript复制
[atguigu@hadoop102 imply-2.7.10]$ curl -O
 https://static.imply.io/quickstart/wikiticker-0.4.tar.gz

说明: -O 在本地保存获取的数据时,使用她们在远程服务器上的文件名进行保存。

代码语言:javascript复制
[atguigu@hadoop102 imply-2.7.10]$ tar -zxvf wikiticker-0.4.tar.gz
[atguigu@hadoop102 imply-2.7.10]$ cd wikiticker-0.4
现在运行带有参数的wikiticker,指示它将输出写入我们的Kafka主题:
[atguigu@hadoop102 wikiticker-0.4]$ bin/wikiticker -J-Dfile.encoding=UTF-8 -out kafka –
topic Wikipedia

查询多次,对比结果的变化

6.4.5 加载自定义kafka主题数据

可以通过编写自定义supervisor spec来加载自己的数据集。要自定义受监督的Kafka索引服务提取,您可以将包含的quickstart/wikipedia-kafka-supervisor.json 规范复制到自己的文件中,根据需要进行编辑,并根据需要创建或关闭管理程序。没有必要自己重启Imply或Druid服务。

0 人点赞