分布式日志处理:ELK+Kafka实现日志收集

2024-08-06 13:49:24 浏览数 (2)

分布式日志处理:

Sleuth实现微服务跟踪

ELK Kafka实现日志收集系统

背景:

主要针对,分布式项目!对多个模块的日志进行管理… 在传统项目中,如果在生产环境中,有多台不同的服务器集群, 如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上使用传统的命令方式查询,这样效率非常低下。

原理:

可以使用 ELK Kafka实现日志收集系统

  • 应用系统发送日志信息给Kafka,
  • 然后Logstash从Kafka中读取消息在转发到Elasticsearch中保存。
  • 最后Kibana从Elasticsearch读取数据并通过web界面展示出来。
  • Kafka相当于Logstashinput端,这个除了使用Kafka,还可以使用FileRedis等等。
Kafka
  • 高吞吐(海量读写数据,缺点:不支持对象类型传输...)、分布式消息系统`
  • 概念:Producer:生产者(消息的来源) Consumer:消费者(消息输出) Topic:主题(消息传递的约定)
  • 消息系统介绍 一个消息系统负责将数据从一个应用传递到另外一个应用, 应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。 有两种主要的消息传递模式:点对点传递模式、 发布-订阅模式。(kafka采用)
ELK: 是三个开源软件的缩写,对应着三个技术:
  • E—— Elasticsearch 是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。 它的特点有: 分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
  • L—— Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式 一般工作方式为c/s架构: client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
  • K—— Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面 可以帮助汇总、分析和搜索重要数据日志。
  • 新增了一个 FileBeat,它是一个轻量级的日志收集处理工具(Agent) Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。 Filebeat隶属于Beats。目前Beats包含四种工具 Packetbeat(搜集网络流量数据) Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据) Filebeat(搜集文件数据) Winlogbeat(搜集 Windows 事件日志数据) Logstash Kibana

Linux环境搭建:

不需要朋友之间跳过即可!应用开发 这里一般公司中,都会有专业的运维人员来进行搭建,个人学习就顺手学习了整理了笔记! 适合个人学习搭建

  • 真实的环境安装也大致如此,当然本人不是专业滴,错误地方希望提醒学习!
  • 本人VM虚拟机的,网络连接方式,采用 NAT网络地址转换 不细致讲解了… 如果是Bridged Adapter桥接可能会麻烦…
  • 本人喜欢工具都安装在一个目录下, 方便查找:cd /usrl/local
  • 别忘了关闭防火墙!

Elasticsearch

  • elasticsearch-6.2.4.tar.gz 安装文件
  • elasticsearch-analysis-ik-6.2.4.zip 安装中文文件
代码语言:javascript复制
sudo sysctl -w vm.max_map_count=262144      	#设置内存!每次启动都要给个运行内存!
tar -zxvf elasticsearch-6.2.4.tar.gz		 	#指定目录下 解压
#解压完成,设置elasticsearch-6.2.4/config/目录下:elasticsearch.yml文件保存这一些配置信息,端口9200...

#linunx 创建一个组 Elasticsearch 不能使用Root用户,需要由一个独立用户!
groupadd elsearch	#创建一个组
useradd elsearch -g elsearch -p elasticsearch #设置密码

#解压elasticsearch中文安装包!
#unzip unzip 解压命令! zip压缩包解压,需要下载应用! install 在线安装(需要网络!)!
yum install unzip	#下载unzip应用;
#解压目录下文件!
unzip /usr/local/elasticsearch-analysis-ik-6.2.4.zip

#授权用户操作elsearch  
chown -R elsearch:elsearch  elasticsearch-6.2.4
#创建一个目录, 把中午解压包放进去。 就可以显示中文了! 
mkdir -p /usr/local/elasticsearch-6.2.4/plugins/analysis-ik/	#elasticsearch-6.2.4/plugins下创建一个文件analysis-ik
#中文解压文件,直接copy进去
cp -r /usr/local/elasticsearch/* /usr/local/elasticsearch-6.2.4/plugins/analysis-ik/

#Contos还需进行配置:修改 soft    nproc     4096
vi /etc/security/limits.d/20-nproc.conf		#修改 soft    nproc     4096 对应...

vi /etc/security/limits.conf  				#文件添加:↓↓
*               soft    nofile          65536
*               hard    nofile          65536
*               soft    nproc           4096
*               hard    nproc           4096
#授权:
chown -R elsearch:elsearch /usr/local/elasticsearch/
启动 Elasticsearch
代码语言:javascript复制
# 切换用户
su elsearch
# 刷新
source /etc/profile
# elasticsearch  /bin目录下启动 
./elasticsearch				#启动Elasticsearch
  • 根据个人情况使用,不同人的配置用户…都不一样!
  • 启动成功显示端口!
Windows 连接测试:

输入Linux 暴漏 ip 端口

注意 :root 配置永久启用空间,就不用没有启动,赋值运行空间!

启动莫名奇妙报错了: ERROR: [1] bootstrap checks failed [1]: max virtual memory areas vm.max_map_

  • 切换到root用户修改配置sysctl.conf 执行: vi /etc/sysctl.conf
  • 添加下面配置:vm.max_map_count=655360
  • 并执行命令:sysctl -p
  • 然后,重新启动elasticsearch,即可启动成功。 虽然不太清除怎么回事不过,唉!好了!

Logstash

  • logstash-6.3.0.tar.gz
代码语言:javascript复制
#解压
tar -zxvf logstash-6.3.0.tar.gz

#修改配置,logstash-6.3.0/bin/logstash.conf
#修改配置,logstash-6.3.0/config/logstash.yml
#启动...比较慢! 
#/usr/local 是我的路径,注意要根据自己的来!
/usr/local/logstash-6.3.0/bin/logstash -f /usr/local/logstash-6.3.0/bin/logstash.conf
logstash.conf

Logstash从Kafka中读取消息在转发到Elasticsearch中保存。kafka与Elasticsearch 直接通信直接的传输工具!

  • input 读取获取目标,output写入输出目标
  • 修改写入方kafka的 ip 端口 配置,线程,主题…
  • 输出方Elasticsearch 的端口配置…
logstash.yml
  • logstash 对 Elasticsearch 的ip端口引用依赖!
启动:

有点慢…

Kibana

  • kibana-6.2.4-linux-x86_64
代码语言:javascript复制
#解压
tar -zxvf kibana-6.2.4-linux-x86_64
#配置:Elasticsearch 的ip端口引用依赖! /usr/local/kibana-6.2.4-linux-x86_64/config/kibana.yml 文件!
#更改ip

#bin目录启动:
#/usr/local/kibana-6.2.4-linux-x86_64/bin# 
./kibana	#启动
kibana.yml
启动:./kibana

Windows查看图形化界面!默认端口5601

kafka

  • kafka_2.10-0.10.2.1.tgz
代码语言:javascript复制
#root 用户下解压:
tar -zxvf kafka_2.10-0.10.2.1.tgz
#配置:kafka_2.10-0.10.2.1/config/server.properties文件,
#内置配置:监听器,将内部的默认ip端口更改为,linux 本机的ip 	0.0.0.0 指所有的ip都可以访问,但不建议更改! 只需要把看到的IP都改了即可!
启动

kafka 启动需要两个服务! zookeeper kafka 内部集成了 zookeeper

代码语言:javascript复制
#zookeeper
/usr/local/kafka_2.10-0.10.2.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.10-0.10.2.1/config/zookeeper.properties
#kafka
/usr/local/kafka_2.10-0.10.2.1/bin/kafka-server-start.sh  -daemon /usr/local/kafka_2.10-0.10.2.1/config/server.properties
测试:

这只是做测试!并不是实际应用!实际应用是程序来写 读的… bin 目录下测试

代码语言:javascript复制
#创建 producer(生产者): 测试生产消息,产生消息, 主题:topic
bin/kafka-console-producer.sh --broker-list localhost:9092 -topic test

#创建 consumer(消费者): 测试消费, 接受消息的 主题:topic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic test --from-beginning
  • 条件允许使用 XShell 开启两个窗口,一个 生产者一个消费者…开启测试:
  • 生产者输入数据回车,消费者可以实时的监听到执行!

linux: JPS查看运行线程!

注意:

上面四个服务都要启动,日志收集器才能正常使用。


应用开发Demo:

SpringBoot 项目三板斧:

依赖:

pom.xml

代码语言:javascript复制
<!-- 父依赖 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
    </parent>
<!-- web 和 kafka依赖! -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 程序只需要向kafka写入日志,内部就会通过 Logstash——Elasticsearch——Kibana展示  -->
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

配置:

就是个普通Demo 基于默认,配置一个kafka的配置即可 application.yml

代码语言:javascript复制
spring:
  kafka:
    producer:
      bootstrap-servers: 192.168.1.110:9092                                     #kafka的服务地址!
      key-serializer: org.apache.kafka.common.serialization.StringSerializer    #消息编码方式Serializer 网络传输序列化操作!
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
启动类:

TestRun.Java

代码语言:javascript复制
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TestRun {
    public static void main(String[] args) {
        SpringApplication.run(TestRun.class, args);
    }
}
controller

TestController.Java

代码语言:javascript复制
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;      //kafkaTemplate模板对象!

    @GetMapping("/tolog")
    public String tolog() {
        //循环写日志!
        for (int i = 0; i < 10; i  ) {
            //向kafka中写入日志...日志不一定是log 只是一种记录...别死板!学习最怕死板了...
            //                         消息主题         key                 日志数据value
            kafkaTemplate.send("dmservice", "dmservice", "kafa send message to elk--------------"   i);
            //消息主题与,logstash-6.3.0/bin/logstash.conf 中设置的 topics 主题匹配对应!!通过它进行匹配通信!
        }
        return "ok";
    }
}

消息主题与,logstash-6.3.0/bin/logstash.conf 中设置的 topics 主题匹配对应!!通过它进行匹配通信!

测试:

启动程序,请求!
查看Kibana 页面:
查看图形报表:
当然也可以直接看搜索引擎中的数据未图形化数据

ip/9200/_search

异常:

Exception thrown when sending a message with key=‘dmservice’ and payload='ka

  • 查看程序配置的 kafka 端口,主题是否准确!

0 人点赞