分布式日志处理:
Sleuth实现微服务跟踪
ELK Kafka实现日志收集系统
背景:
主要针对,分布式项目!对多个模块的日志进行管理… 在传统项目中,如果在生产环境中,有多台不同的服务器集群, 如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上使用传统的命令方式查询,这样效率非常低下。
原理:
可以使用 ELK Kafka实现日志收集系统
- 应用系统发送日志信息给Kafka,
- 然后Logstash从Kafka中读取消息在转发到Elasticsearch中保存。
- 最后Kibana从Elasticsearch读取数据并通过web界面展示出来。
Kafka相当于Logstashinput端,这个除了使用Kafka,还可以使用FileRedis等等。
Kafka
- 高吞吐
(海量读写数据,缺点:不支持对象类型传输...)
、分布式消息系统` - 概念:
Producer:生产者(消息的来源)
Consumer:消费者(消息输出)
Topic:主题(消息传递的约定)
- 消息系统介绍
一个消息系统负责将数据从一个应用传递到另外一个应用,
应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。
有两种主要的消息传递模式:
点对点传递模式、
发布-订阅模式。(kafka采用)
ELK: 是三个开源软件的缩写,对应着三个技术:
- E—— Elasticsearch 是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。 它的特点有: 分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
- L—— Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式 一般工作方式为c/s架构: client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
- K—— Kibana
也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的
日志分析友好的 Web 界面
可以帮助汇总、分析和搜索重要数据日志。 - 新增了一个
FileBeat
,它是一个轻量级的日志收集处理工具(Agent)Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。
Filebeat隶属于Beats。目前Beats包含四种工具
Packetbeat(搜集网络流量数据) Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据) Filebeat(搜集文件数据) Winlogbeat(搜集 Windows 事件日志数据) Logstash Kibana
Linux环境搭建:
不需要朋友之间跳过即可!应用开发
这里一般公司中,都会有专业的
运维人员来进行搭建,个人学习就顺手学习了整理了笔记!
适合个人学习搭建
- 真实的环境安装也大致如此,当然本人不是专业滴,错误地方希望提醒学习!
- 本人VM虚拟机的,网络连接方式,采用
NAT网络地址转换
不细致讲解了… 如果是Bridged Adapter桥接
可能会麻烦… - 本人喜欢工具都安装在一个目录下, 方便查找:
cd /usrl/local
别忘了关闭防火墙!
Elasticsearch
- elasticsearch-6.2.4.tar.gz 安装文件
- elasticsearch-analysis-ik-6.2.4.zip 安装中文文件
sudo sysctl -w vm.max_map_count=262144 #设置内存!每次启动都要给个运行内存!
tar -zxvf elasticsearch-6.2.4.tar.gz #指定目录下 解压
#解压完成,设置elasticsearch-6.2.4/config/目录下:elasticsearch.yml文件保存这一些配置信息,端口9200...
#linunx 创建一个组 Elasticsearch 不能使用Root用户,需要由一个独立用户!
groupadd elsearch #创建一个组
useradd elsearch -g elsearch -p elasticsearch #设置密码
#解压elasticsearch中文安装包!
#unzip unzip 解压命令! zip压缩包解压,需要下载应用! install 在线安装(需要网络!)!
yum install unzip #下载unzip应用;
#解压目录下文件!
unzip /usr/local/elasticsearch-analysis-ik-6.2.4.zip
#授权用户操作elsearch
chown -R elsearch:elsearch elasticsearch-6.2.4
#创建一个目录, 把中午解压包放进去。 就可以显示中文了!
mkdir -p /usr/local/elasticsearch-6.2.4/plugins/analysis-ik/ #elasticsearch-6.2.4/plugins下创建一个文件analysis-ik
#中文解压文件,直接copy进去
cp -r /usr/local/elasticsearch/* /usr/local/elasticsearch-6.2.4/plugins/analysis-ik/
#Contos还需进行配置:修改 soft nproc 4096
vi /etc/security/limits.d/20-nproc.conf #修改 soft nproc 4096 对应...
vi /etc/security/limits.conf #文件添加:↓↓
* soft nofile 65536
* hard nofile 65536
* soft nproc 4096
* hard nproc 4096
#授权:
chown -R elsearch:elsearch /usr/local/elasticsearch/
启动 Elasticsearch
代码语言:javascript复制# 切换用户
su elsearch
# 刷新
source /etc/profile
# elasticsearch /bin目录下启动
./elasticsearch #启动Elasticsearch
- 根据个人情况使用,不同人的配置用户…都不一样!
- 启动成功显示端口!
Windows 连接测试:
输入Linux 暴漏 ip 端口
注意 :root 配置永久启用空间,就不用没有启动,赋值运行空间!
启动莫名奇妙报错了: ERROR: [1] bootstrap checks failed [1]: max virtual memory areas vm.max_map_
- 切换到root用户修改配置sysctl.conf 执行:
vi /etc/sysctl.conf
- 添加下面配置:
vm.max_map_count=655360
- 并执行命令:
sysctl -p
- 然后,重新启动elasticsearch,即可启动成功。
虽然不太清除怎么回事不过,唉!好了!
Logstash
- logstash-6.3.0.tar.gz
#解压
tar -zxvf logstash-6.3.0.tar.gz
#修改配置,logstash-6.3.0/bin/logstash.conf
#修改配置,logstash-6.3.0/config/logstash.yml
#启动...比较慢!
#/usr/local 是我的路径,注意要根据自己的来!
/usr/local/logstash-6.3.0/bin/logstash -f /usr/local/logstash-6.3.0/bin/logstash.conf
logstash.conf
Logstash从Kafka中读取消息在转发到Elasticsearch中保存。kafka与Elasticsearch 直接通信直接的传输工具!
- input 读取
获取
目标,output写入输出
目标 - 修改写入方
kafka
的 ip 端口 配置,线程,主题… - 输出方
Elasticsearch
的端口配置…
logstash.yml
- logstash 对 Elasticsearch 的ip端口引用依赖!
启动:
有点慢…
Kibana
- kibana-6.2.4-linux-x86_64
#解压
tar -zxvf kibana-6.2.4-linux-x86_64
#配置:Elasticsearch 的ip端口引用依赖! /usr/local/kibana-6.2.4-linux-x86_64/config/kibana.yml 文件!
#更改ip
#bin目录启动:
#/usr/local/kibana-6.2.4-linux-x86_64/bin#
./kibana #启动
kibana.yml
启动:./kibana
Windows查看图形化界面!默认端口5601
kafka
- kafka_2.10-0.10.2.1.tgz
#root 用户下解压:
tar -zxvf kafka_2.10-0.10.2.1.tgz
#配置:kafka_2.10-0.10.2.1/config/server.properties文件,
#内置配置:监听器,将内部的默认ip端口更改为,linux 本机的ip 0.0.0.0 指所有的ip都可以访问,但不建议更改! 只需要把看到的IP都改了即可!
启动
kafka 启动需要两个服务! zookeeper
kafka
内部集成了 zookeeper
#zookeeper
/usr/local/kafka_2.10-0.10.2.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.10-0.10.2.1/config/zookeeper.properties
#kafka
/usr/local/kafka_2.10-0.10.2.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.10-0.10.2.1/config/server.properties
测试:
这只是做测试!并不是实际应用!实际应用是程序来写 读的… bin 目录下测试
代码语言:javascript复制#创建 producer(生产者): 测试生产消息,产生消息, 主题:topic
bin/kafka-console-producer.sh --broker-list localhost:9092 -topic test
#创建 consumer(消费者): 测试消费, 接受消息的 主题:topic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic test --from-beginning
- 条件允许使用 XShell 开启两个窗口,一个 生产者一个消费者…开启测试:
- 生产者输入数据
回车
,消费者可以实时的监听到执行!
linux: JPS
查看运行线程!
注意:
上面四个服务都要启动,日志收集器
才能正常使用。
应用开发Demo:
SpringBoot 项目三板斧:
依赖:
pom.xml
<!-- 父依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<!-- web 和 kafka依赖! -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 程序只需要向kafka写入日志,内部就会通过 Logstash——Elasticsearch——Kibana展示 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置:
就是个普通Demo 基于默认,配置一个kafka的配置即可
application.yml
spring:
kafka:
producer:
bootstrap-servers: 192.168.1.110:9092 #kafka的服务地址!
key-serializer: org.apache.kafka.common.serialization.StringSerializer #消息编码方式Serializer 网络传输序列化操作!
value-serializer: org.apache.kafka.common.serialization.StringSerializer
启动类:
TestRun.Java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TestRun {
public static void main(String[] args) {
SpringApplication.run(TestRun.class, args);
}
}
controller
TestController.Java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate; //kafkaTemplate模板对象!
@GetMapping("/tolog")
public String tolog() {
//循环写日志!
for (int i = 0; i < 10; i ) {
//向kafka中写入日志...日志不一定是log 只是一种记录...别死板!学习最怕死板了...
// 消息主题 key 日志数据value
kafkaTemplate.send("dmservice", "dmservice", "kafa send message to elk--------------" i);
//消息主题与,logstash-6.3.0/bin/logstash.conf 中设置的 topics 主题匹配对应!!通过它进行匹配通信!
}
return "ok";
}
}
消息主题与,logstash-6.3.0/bin/logstash.conf 中设置的 topics 主题匹配对应!!通过它进行匹配通信!
测试:
启动程序,请求!
查看Kibana 页面:
查看图形报表:
当然也可以直接看搜索引擎中的数据未图形化数据
:
ip/9200/_search
异常:
Exception thrown when sending a message with key=‘dmservice’ and payload='ka
- 查看程序配置的 kafka 端口,主题是否准确!