我们使用Apache Flume来采集数据到Kafka中进行存储,最后在ELK中展示出来。到http://flume.apache.org/的地址下载Apache Flume,下载后部署在日志的服务器。下载后进行解压以及配置到环境变量中。整体思路是在拉勾网搜索“测试开发工程师”,把获取到的结果信息存储到Kafka的系统中,最后展示在ELK中。下面具体配置这些信息。在conf的目录下编辑文件,文件内容为:
代码语言:javascript复制#设置代理名
agent.sources=s1
agent.channels=c1
agent.sinks=k1
#设置收集方式
agent.sources.s1.type=exec
agent.sources.s1.command=tail -F /Applications/devOps/bigData/ELK/apache-flume/logs/apps.log
agant.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
apage.channels.c1.transactionCapacity=100
#设置kafka接收器
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#设置kafka的broker和端口号
agent.sinks.k1.brokerList=localhost:9092
#设置kafka的topic
agent.sinks.k1.topic=laGou
#设置序列化
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
#指定管道名
agent.sinks.k1.channel=c1
这里使用的主题是laGou,切记此时需要启动Kafka。下来启动Apache Flume,在apache-flume/bin的执行如下命令来启动,命令为:
代码语言:javascript复制flume-ng agent -n agent --conf conf --conf-file ../conf/flume-kafka.properties -Dflume.root.logger=DEBUG,CONSOLE
执行后,输出如下的信息:
下来使用分流数据的方式来实现数据的展示,具体可以理解为把采集到的数据存储到Kafka系统中,然后使用LogStash来消费Kafka存储的数据,并将消费后的数据存储到ElasticSearch中。下来配置logstash.yml的文件,配置LogStash账户和密码,具体如下:
配置kafka_laGou.conf,具体内容为:
配置完成后,在控制台中LogStach来消费Kafka集群中主题为laGou的数据,到LogStash的bin目录下执行:
代码语言:javascript复制./logstash -f ../config/kafka_laGou.conf
执行后,LogStash的Agent将正常启动并消费Kafka集群中的数据,然后把消费后的数据存储到ElasticSearch集群中,执行后,输出如下信息:
代码语言:javascript复制Sending Logstash's logs to /Applications/devOps/bigData/ELK/logstash/logs which is now configured via log4j2.properties
[2021-06-12T18:39:43,175][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-06-12T18:39:43,210][FATAL][logstash.runner ] Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the "path.data" setting.
[2021-06-12T18:39:43,221][ERROR][org.logstash.Logstash ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
localhost:bin liwangping$ clear
localhost:bin liwangping$ ./logstash -f ../config/kafka_laGou.conf
Sending Logstash's logs to /Applications/devOps/bigData/ELK/logstash/logs which is now configured via log4j2.properties
[2021-06-12T18:40:31,712][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-06-12T18:40:32,136][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.3.2"}
[2021-06-12T18:40:33,674][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2021-06-12T18:40:34,092][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@localhost:9200/]}}
[2021-06-12T18:40:34,111][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://elastic:xxxxxx@localhost:9200/, :path=>"/"}
[2021-06-12T18:40:34,426][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@localhost:9200/"}
[2021-06-12T18:40:34,505][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2021-06-12T18:40:34,508][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>6}
[2021-06-12T18:40:34,528][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2021-06-12T18:40:34,544][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2021-06-12T18:40:34,561][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2021-06-12T18:40:34,584][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x53f09319 run>"}
[2021-06-12T18:40:34,670][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2021-06-12T18:40:34,676][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-06-12T18:40:34,691][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = logstash-0
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-83756
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[2021-06-12T18:40:34,797][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 1.1.0
[2021-06-12T18:40:34,798][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : fdcf75ea326b8e07
[2021-06-12T18:40:35,011][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: E0qvXyu_T_Wr_vZgZUV80w
[2021-06-12T18:40:35,024][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
[2021-06-12T18:40:35,029][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Revoking previously assigned partitions []
[2021-06-12T18:40:35,029][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] (Re-)joining group
[2021-06-12T18:40:35,047][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2021-06-12T18:40:35,149][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Successfully joined group with generation 1
[2021-06-12T18:40:35,151][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Setting newly assigned partitions [laGou-0, laGou-1, laGou-2, laGou-3, laGou-4, laGou-5]
[2021-06-12T18:40:35,168][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-0 to offset 1.
[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-1 to offset 1.
[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-2 to offset 1.
[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-3 to offset 1.
[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-4 to offset 1.
[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-5 to offset 0.
此时,在Kafka的监控系统中可以看到主题laGou消费的详细信息,如下所示:
下来实现数据的可视化,把数据存储到ElasticSearch的集群后,就可以通过Kibana来查询和分析数据。在ManageMent里面创建索引后,点击Discover模块,然后就会展示消费到的拉勾网的测试开发职位的数据,如下所示:
可以使用不同的索引来查询,比如使用message来查询,就会显示如下的信息:
当然也可以点击查看完整的数据,点击向右的箭头,就可以使用table格式和JSON格式来展示具体的数据。