背景信息
• 通过Monstache快速同步及订阅全量或增量数据。
• 将MongoDB数据实时同步至高版本Elasticsearch。
• 解读Monstache常用配置参数,应用于更多的业务场景。
环境准备
MongoDB:5.0.11
Elasticsearch:7.10.1
Monstache:rel6
一.搭建go环境
Monstache依赖go环境,所以在安装Monstache之前需要先进行go环境的安装。
1.获取go依赖包
代码语言:javascript复制wget https://go.dev/dl/go1.17.5.linux-amd64.tar.gz
2.解压go压缩包
代码语言:javascript复制tar -zxvf go1.17.5.linux-amd64.tar.gz
3.导入go环境变量
代码语言:javascript复制export PATH=$PATH:/softpackage/go/bin
source /etc/profile
4.验证go版本
代码语言:javascript复制go version
二.安装Monstache
安装前根据自身MongoDB与Elasticsearch版本选择相应的Monstache进行安装。
1.从Git库中下载项目代码
代码语言:javascript复制git clone https://github.com/rwynn/monstache.git
如果提示-bash: git: 未找到命令,可使用
代码语言:javascript复制yum install git
进行Git工具的安装。
2.进入Monstache目录
代码语言:javascript复制cd monstache/
3.切换Monstache版本(具体版本根据之前的组件版本信息进行选择。)
代码语言:javascript复制git checkout rel6
这里因为Elasticsearch集群是7.10版本,所以monstache选择rel6版本
4.安装Monstache
代码语言:javascript复制go install
5.在安装目录下查看Monstache版本
代码语言:javascript复制./bin/monstache -v
安装成功后效果如图下所示:
三.配置实时同步任务
在安装目录下手动创建Monstache配置使用TOML格式,默认情况下,Monstache会使用默认端口连接本地主机上的Elasticsearch和MongoDB,并追踪MongoDB oplog。在Monstache运行期间,MongoDB的任何更改都会同步到Elasticsearch中。
由于本文使用的是自建MongoDB和Elasticsearch,并且需要指定同步对象(testdb数据库中的user_info集合),因此要修改默认的Monstache配置文件。修改方式如下:
1.进入Monstache安装目录,创建并编辑配置文件。
代码语言:javascript复制cd /root/go/monstache
vim config.toml
2.参考以下示例,修改配置文件。简单的配置示例如下,详细配置请参见Monstache Usage。
代码语言:javascript复制#connection settings
# connect to MongoDB using the following URL
mongo-url = "mongodb://root:<your_mongodb_password>IP:27017"
# connect to the Elasticsearch REST API at the following node URLs
elasticsearch-urls = "http://IP:9200"
# frequently required settings
# if you need to seed an index from a collection and not just listen and sync changes events
# you can copy entire collections or views from MongoDB to Elasticsearch
direct-read-namespaces = "testdb.user_info"
# if you want to use MongoDB change streams instead of legacy oplog tailing use change-stream-namespaces
# change streams require at least MongoDB API 3.6
# if you have MongoDB 4 you can listen for changes to an entire database or entire deployment
# in this case you usually don't need regexes in your config to filter collections unless you target the deployment.
# to listen to an entire db use only the database name. For a deployment use an empty string.
#change-stream-namespaces = "mydb.col"
# additional settings
# if you don't want to listen for changes to all collections in MongoDB but only a few
# e.g. only listen for inserts, updates, deletes, and drops from mydb.mycollection
# this setting does not initiate a copy, it is only a filter on the change event listener
#namespace-regex = '^mydb.col$'
# compress requests to Elasticsearch
#gzip = true
# generate indexing statistics
#stats = true
# index statistics into Elasticsearch
#index-stats = true
# use the following PEM file for connections to MongoDB
#mongo-pem-file = "/path/to/mongoCert.pem"
# disable PEM validation
#mongo-validate-pem-file = false
# use the following user name for Elasticsearch basic auth
elasticsearch-user = "elastic"
# use the following password for Elasticsearch basic auth
elasticsearch-password = "<your_es_password>"
# use 4 go routines concurrently pushing documents to Elasticsearch
elasticsearch-max-conns = 4
# use the following PEM file to connections to Elasticsearch
#elasticsearch-pem-file = "/path/to/elasticCert.pem"
# validate connections to Elasticsearch
#elastic-validate-pem-file = true
# propogate dropped collections in MongoDB as index deletes in Elasticsearch
dropped-collections = true
# propogate dropped databases in MongoDB as index deletes in Elasticsearch
dropped-databases = true
# do not start processing at the beginning of the MongoDB oplog
# if you set the replay to true you may see version conflict messages
# in the log if you had synced previously. This just means that you are replaying old docs which are already
# in Elasticsearch with a newer version. Elasticsearch is preventing the old docs from overwriting new ones.
#replay = false
# resume processing from a timestamp saved in a previous run
resume = true
# do not validate that progress timestamps have been saved
#resume-write-unsafe = false
# override the name under which resume state is saved
#resume-name = "default"
# use a custom resume strategy (tokens) instead of the default strategy (timestamps)
# tokens work with MongoDB API 3.6 while timestamps work only with MongoDB API 4.0
resume-strategy = 0
# exclude documents whose namespace matches the following pattern
#namespace-exclude-regex = '^mydb.ignorecollection$'
# turn on indexing of GridFS file content
#index-files = true
# turn on search result highlighting of GridFS content
#file-highlighting = true
# index GridFS files inserted into the following collections
#file-namespaces = "users.fs.files"
# print detailed information including request traces
verbose = true
# enable clustering mode
cluster-name = 'es-yjd'
# do not exit after full-sync, rather continue tailing the oplog
#exit-after-direct-reads = false
mapping
namespace = "testdb.user_info"
index = " user_info"
#type = ""
注:以上配置仅使用了部分参数完成数据实时同步,如果您有更复杂的同步需求,请参见Monstache config和Advanced进行配置。
3.运行任务
代码语言:javascript复制./bin/monstache -f config.toml
注: 通过-f参数,您可以显式运行Monstache,系统会打印所有调试日志(包括对Elasticsearch的请求追踪)。
四.验证数据结果
MongoDB:
我们这里手动在MongoDB中插入了4条测试数据。
代码语言:javascript复制db.getCollection("user_info").find().count()
Elasticsearch:
代码语言:javascript复制GET /user_info/_count
可以看到数据已经同步到了elasticsearch中。
FAQ1:安装Monstache过程中如果遇到Get "https://proxy.golang.org/github.com/!burnt!sushi/toml/@v/v1.0.0.mod": dial tcp 172.217.163.49:443: connect: connection refused的错误
主要是由于go env默认的地址无法访问到地址。所以需要在env中指定一个可以访问到go环境的地址。
执行
代码语言:javascript复制go env -w GOPROXY=[https://goproxy.cn](https://goproxy.cn)
然后重新执行
代码语言:javascript复制go install
安装monstache即可。
FAQ 2:克隆代码过程中报错fatal: unable to access 'https://github.com/rwynn/monstache.git/': TCP connection reset by peer
主要原因可能是网速慢,文件大。
可以将Http缓存设置大一些,比如1G 1048576000
,或者3G 3194304000
代码语言:javascript复制git config --global http.postBuffer 1048576000
然后重新clone代码即可恢复正常。