通过Logstash复制MySQL数据到ElasticSearch

2022-12-23 08:45:45 浏览数 (1)

docker部署es

代码语言:javascript复制
// docker启动es
docker run -d --name es -p 9200:9200 -p 9300:9300 -v D:workiiodockerFileesdata:usrshareelasticsearch -e "discovery.type=single-node" -m 512M docker.elastic.co/elasticsearch/elasticsearch:7.11.2

参数说明

-d: 后台运行容器,并返回容器ID;

-i: 以交互模式运行容器,通常与 -t 同时使用;

-P: 随机端口映射,容器内部端口随机映射到主机的端口,格式为:主机(宿主)端口:容器端口

-t: 为容器重新分配一个伪输入终端,通常与 -i 同时使用;

–name=“nginx-lb”: 为容器指定一个名称;

-e username=“ritchie”: 设置环境变量;

-m :设置容器使用内存最大值;

–volume , -v: 绑定一个卷

es安装ik分词

下载地址

代码语言:javascript复制
// 与容器交互 es为容器名称(也可以使用容器Id)
docker exec -it es /bin/bash

// 容器内创建目录
mkdir /usr/share/elasticsearch/plugins/ik

// 将文件导入指定容器内目录下 es为容器名称(也可以使用容器Id)
docker cp C:UsersadminDownloadselasticsearch-analysis-ik-7.11.2.zip es:/usr/share/elasticsearch/plugins/ik

// 容器内解压压缩包
unzip elasticsearch-analysis-ik-7.11.2.zip

退出容器,重启容器

测试ik分词

代码语言:javascript复制
GET _analyze?pretty
 
{
 
  "analyzer": "ik_max_word",
 
  "text":"我们是中国人啊"
 
}
代码语言:javascript复制
{
    "tokens": [
        {
            "token": "我们",
            "start_offset": 0,
            "end_offset": 2,
            "type": "CN_WORD",
            "position": 0
        },
        {
            "token": "是",
            "start_offset": 2,
            "end_offset": 3,
            "type": "CN_CHAR",
            "position": 1
        },
        {
            "token": "中国人",
            "start_offset": 3,
            "end_offset": 6,
            "type": "CN_WORD",
            "position": 2
        },
        {
            "token": "中国",
            "start_offset": 3,
            "end_offset": 5,
            "type": "CN_WORD",
            "position": 3
        },
        {
            "token": "国人",
            "start_offset": 4,
            "end_offset": 6,
            "type": "CN_WORD",
            "position": 4
        },
        {
            "token": "啊",
            "start_offset": 6,
            "end_offset": 7,
            "type": "CN_CHAR",
            "position": 5
        }
    ]
}

注意:如果没有生效,进入容器检查ik分词插件是否存在,如果不存在应该是容器挂在本地磁盘异常,具体原因自行百度。

docker部署logstash

logstash和es版本要一致

代码语言:javascript复制
// docker启动logstash
docker run --name logstash -d -p 5044:5044 -v D:workiiodockerFilelogstashdata:usrsharelogstash logstash:7.11.2

修改logstash配置

1、复制mysql驱动到logstash容器中

2、进入logstash容器中修改配置文件

代码语言:javascript复制
1)修改/config/logstash.yml 中的es地址
2)修改/pipeline/logstash.conf 中的相关配置(input、output、filler)
代码语言:javascript复制
input {
	jdbc {
        jdbc_connection_string => "jdbc:mysql://mysql地址:端口/数据库名"
        jdbc_user => "账号"
        jdbc_password => "密码"
        #mysql驱动位置
        jdbc_driver_library => "/usr/share/logstash/driver/mysql-connector-java-8.0.23.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        lowercase_column_names => false
        # statement_filepath => "filename.sql"
        statement => "sql语句"
        #定时器(每分钟执行一次)
        schedule => "* * * * *"
        type => "product"
    }
}

output {

     if[type]=="product"{
        elasticsearch {
            hosts => ["es地址:9200"]
            #manage_template => false
            #template_name => "myik"
            #template => "/usr/share/logstash/template/test_template.json"
            #template_overwrite => true
            #document_id => "%{id}"
            #索引名称
            index => "logstash-test"
        }

     }
}

注意:type关键字可以用于区分数据源,做一些逻辑操作,但是如果查询的数据有type字段,会覆盖掉input中定义的type

0 人点赞