数据订阅/发布
在分布式集群中,假设数据库发生了改动,就得修改所有分布式服务的数据库配置
我们可以通过zookeeper来实现数据库配置的订阅发布
我们先初始化数据库配置项环境
在zookeeper配置以下数据
代码语言:javascript复制[zk: localhost:2181(CONNECTED) 51] get /config-server/app1/database
{"Host":"127.0.0.1:3300","User":"root","Password":"233274","Database":"test"}
go代码环境准备
引入
代码语言:javascript复制github.com/go-zookeeper/zk
go.mod内容为:
代码语言:javascript复制module zkStudy
go 1.17
require (
github.com/go-sql-driver/mysql v1.6.0
github.com/go-zookeeper/zk v1.0.2
github.com/jmoiron/sqlx v1.3.4
)
发布数据库配置
我们只需要set path,在zk中将自动把数据发布到订阅此目录的客户端中
以下代码,每2秒更改一次数据库数据
代码语言:javascript复制func loopChangeDbConfig() {
var dbConfig = config.DatabaseConfig{Host: "127.0.0.1:3300",User: "root",Password: "123456",Database: "test"}
t := time.NewTicker(2 * time.Second)
for {
select {
case <-t.C:
dbConfig.Password=strconv.Itoa(rand.Intn(999999) 100000)
jsonByte,_ := json.Marshal(dbConfig)
_,err := zkConnect.Set(databaseZKPath,jsonByte,-1)
if err!=nil {
fmt.Println("zk set dbConfig path err :", err)
return
}
}
}
}
订阅数据库配置
通过zk.getW方法,获取数据并返回一个event单向通道,通过此通道可监听获取一条事件更改数据:
代码语言:javascript复制func getDatabaseConfig() <-chan zk.Event {
//listen mysql-config path
jsonStrByte, _, event, err := zkConnect.GetW(databaseZKPath)
if err != nil {
panic(err)
}
_ = json.Unmarshal(jsonStrByte, &databaseConfig)
fmt.Printf("% v 123n", databaseConfig)
return event
}
获取到event之后,新开协程,进行阻塞获取通道,当获取到数据后,重新获取配置并继续获取一个通道监听数据
代码语言:javascript复制func listenDBConfigChange(event <-chan zk.Event) {
var e zk.Event
for {
e = <-event
//if node data change,db reconnect
if e.Type == zk.EventNodeDataChanged {
fmt.Printf("node data changed: %s, n",e.Path)
event = getDatabaseConfig()
err := db.Close()
if err != nil {
panic(err)
}
connectDb()
dbTest()
}
}
}
运行结果:
本文为仙士可原创文章,转载无需和我联系,但请注明来自仙士可博客www.php20.cn