https://github.com/alibaba/nacos 是阿里开源的服务发现和配置同步组件,上手非常容易,我们介绍下如何部署,然后看下nacos提供的golang sdk:https://github.com/nacos-group/nacos-sdk-go如何使用,分析下具体的源码实现。
代码语言:javascript复制docker run --name nacos-quick -e MODE=standalone -p 8848:8848 -p 9848:9848 -d nacos/nacos-server:2.0.2
Unable to find image 'nacos/nacos-server:2.0.2' locally
2.0.2: Pulling from nacos/nacos-server
9a03b1668b6d: Pull complete
Digest: sha256:ac66d2fbc1ba432beff88beb165e5012686863d72a5e0f25da06e23c5e7b329d
Status: Downloaded newer image for nacos/nacos-server:2.0.2
db9558d41223b12bd58f2c120ead7d506a50bd40327a3fc6518178b27e50dd99
在nacos 1.X的版本中使用http方式来做服务注册和发现,配置主端口(默认8848);在2.0版本支持了grpc 服务发现:9848 是客户端gRPC请求服务端端口,用于客户端向服务端发起连接和请求9849是服务端gRPC请求服务端端口,用于服务间同步等。
我们实现一个服务注册
代码语言:javascript复制curl -X POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?serviceName=nacos.naming.serviceName&ip=20.18.7.10&port=8080'
ok
拉取注册结果
代码语言:javascript复制curl -X GET 'http://127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=nacos.naming.serviceName'
{"name":"DEFAULT_GROUP@@nacos.naming.serviceName","groupName":"DEFAULT_GROUP","clusters":"","cacheMillis":10000,"hosts":[],"lastRefTime":1667400684240,"checksum":"","allIPs":false,"reachProtectionThreshold":false,"valid":true}
{"name":"DEFAULT_GROUP@@nacos.naming.serviceName","groupName":"DEFAULT_GROUP","clusters":"","cacheMillis":10000,"hosts":[{"instanceId":"20.18.7.10#8080#DEFAULT#DEFAULT_GROUP@@nacos.naming.serviceName","ip":"20.18.7.10","port":8080,"weight":1.0,"healthy":true,"enabled":true,"ephemeral":true,"clusterName":"DEFAULT","serviceName":"DEFAULT_GROUP@@nacos.naming.serviceName","metadata":{},"instanceHeartBeatInterval":5000,"instanceIdGenerator":"simple","ipDeleteTimeout":30000,"instanceHeartBeatTimeOut":15000}],"lastRefTime":1667400719947,"checksum":"","allIPs":false,"reachProtectionThreshold":false,"valid":true}
同样我们也可以使用nacos的配置中心功能,发布配置
代码语言:javascript复制curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test&content=helloWorld"
true
获取配置
代码语言:javascript复制curl -X GET "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test"
helloWorld
其实golang的sdk就是基于上述api做的封装来实现服务注册与发现的。
我们定义一个服务
代码语言:javascript复制syntax = "proto3";
import "google/protobuf/empty.proto";
package grpcnacos;
option go_package = ".;grpcnacos";
service Test{
rpc Test(google.protobuf.Empty) returns( TestResponse) {};
}
message TestResponse{
string msg = 1;
}
生成对应的golang代码
代码语言:javascript复制mkdir -p ../pkg/protocol/grpcnacos
protoc --go_out=../pkg/protocol/grpcnacos --go_opt=paths=source_relative --go-grpc_out=../pkg/protocol/grpcnacos --go-grpc_opt=paths=source_relative grpcnacos.proto
定义grpc服务的实现逻辑
代码语言:javascript复制package service
import (
"context"
"learn/learn/Nacos/pkg/protocol/grpcnacos"
"log"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
type Service struct {
grpcnacos.UnimplementedTestServer
}
func (s Service) Test(ctx context.Context, empty *emptypb.Empty) (*grpcnacos.TestResponse, error) {
log.Println("收到一个请求")
return &grpcnacos.TestResponse{Msg: "test"}, nil
}
注册我们的服务
代码语言:javascript复制package main
import (
"fmt"
"learn/learn/Nacos/exp1/service"
"learn/learn/Nacos/pkg/protocol/grpcnacos"
"log"
"net"
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
"google.golang.org/grpc"
)
func main() {
server := grpc.NewServer()
service := service.Service{}
grpcnacos.RegisterTestServer(server, service)
port := GenFreePort()
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatalf("监听端口:%d失败: %s", port, err.Error())
}
// 创建serverConfig
// 支持多个;至少一个ServerConfig
serverConfig := []constant.ServerConfig{
{
IpAddr: "127.0.0.1",
Port: 8848,
},
}
// 创建clientConfig
clientConfig := constant.ClientConfig{
NamespaceId: "", // 如果需要支持多namespace,我们可以场景多个client,它们有不同的NamespaceId。当namespace是public时,此处填空字符串。
TimeoutMs: 50000,
NotLoadCacheAtStart: true,
LogLevel: "debug",
}
// 创建服务发现客户端的另一种方式 (推荐)
namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfig,
},
)
if err != nil {
log.Fatalf("初始化nacos失败: %s", err.Error())
}
success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{
Ip: "127.0.0.1",
Port: uint64(port),
ServiceName: "test-server",
Weight: 10,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: map[string]string{"name": "test"},
ClusterName: "DEFAULT", // 默认值DEFAULT
GroupName: "DEFAULT_GROUP", // 默认值DEFAULT_GROUP
})
if err != nil {
log.Fatalf("注册服务失败: %s", err.Error())
}
log.Println("success: ", success)
log.Printf("服务启动成功;PORT:%dn", port)
_ = server.Serve(listen)
}
// GenFreePort 获取一个空闲的端口;端口避免写死,因为要启动多个实例,测试负载均衡
func GenFreePort() int {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
panic(err)
}
listen, err := net.ListenTCP("tcp", addr)
if err != nil {
panic(err)
}
defer listen.Close()
return listen.Addr().(*net.TCPAddr).Port
}
通过名字获取服务的实力,请求获取结果
代码语言:javascript复制package main
import (
"context"
"fmt"
"learn/learn/Nacos/pkg/protocol/grpcnacos"
"log"
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/emptypb"
)
func main() {
// 创建serverConfig
// 支持多个;至少一个ServerConfig
serverConfig := []constant.ServerConfig{
{
IpAddr: "127.0.0.1",
Port: 8848,
},
}
// 创建clientConfig
clientConfig := constant.ClientConfig{
// 如果需要支持多namespace,我们可以场景多个client,它们有不同的NamespaceId。当namespace是public时,此处填空字符串。
NamespaceId: "",
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogLevel: "debug",
}
// 创建服务发现客户端的另一种方式 (推荐)
namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfig,
},
)
if err != nil {
log.Fatalf("初始化nacos失败: %s", err.Error())
}
// SelectOneHealthyInstance将会按加权随机轮询的负载均衡策略返回一个健康的实例
// 实例必须满足的条件:health=true,enable=true and weight>0
instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{
ServiceName: "test-server",
GroupName: "DEFAULT_GROUP", // 默认值DEFAULT_GROUP
Clusters: []string{"DEFAULT"}, // 默认值DEFAULT
})
log.Printf("获取到的实例IP:%s;端口:%d", instance.Ip, instance.Port)
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", instance.Ip, instance.Port), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("监听%s:%d失败:%s", instance.Ip, instance.Port, err.Error())
}
client := grpcnacos.NewTestClient(conn)
res, err := client.Test(context.Background(), &emptypb.Empty{})
if err != nil {
log.Fatalf("调用TestClient失败: %s", err.Error())
}
log.Println(res.Msg)
}
至此我们完成了简单的服务注册和服务发现功能。测试下
代码语言:javascript复制% go run learn/Nacos/exp1/server/main.go
2022/11/04 00:04:38 success: true
2022/11/04 00:04:38 服务启动成功;PORT:56358
2022/11/04 00:04:51 收到一个请求
代码语言:javascript复制 % go run learn/Nacos/exp1/client/main.go
2022/11/04 00:04:51 获取到的实例IP:127.0.0.1;端口:56358
2022/11/04 00:04:51 test
我们可以在页面上看下我们服务的注册情况http://127.0.0.1:8848/nacos/#/login用户名密码都是nacos
可以看到,不论是服务端注册还是客户端拉取,我们首先都需要初始化namingService的客户端,它需要两组参数
代码语言:javascript复制 namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &clientConfig,
ServerConfigs: serverConfig,
},
)
其中clientConfig配置了客户端,也就是我们的应用允许的超时时间等配置,serverConfigs是一组服务端的地址和端口后,也就是我们的nacos服务的地址,可以配置多个实例实现多活。
对于server端来说是通过RegisterInstance来实现服务的注册的
代码语言:javascript复制 success, err := namingClient.RegisterInstance(vo.RegisterInstanceParam{
Ip: "127.0.0.1",
Port: uint64(port),
ServiceName: "test-server",
Weight: 10,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: map[string]string{"name": "test"},
ClusterName: "DEFAULT", // 默认值DEFAULT
GroupName: "DEFAULT_GROUP", // 默认值DEFAULT_GROUP
})
客户端是通过SelectOneHealthyInstance将会按加权随机轮询的负载均衡策略返回一个健康的实例
代码语言:javascript复制 // 实例必须满足的条件:health=true,enable=true and weight>0
instance, err := namingClient.SelectOneHealthyInstance(vo.SelectOneHealthInstanceParam{
ServiceName: "test-server",
GroupName: "DEFAULT_GROUP", // 默认值DEFAULT_GROUP
Clusters: []string{"DEFAULT"}, // 默认值DEFAULT
})
log.Printf("获取到的实例IP:%s;端口:%d", instance.Ip, instance.Port)
使用起来很简单方便又没有。下面分析下源码实现,注册参数定义如下
代码语言:javascript复制type RegisterInstanceParam struct {
Ip string `param:"ip"` //required
Port uint64 `param:"port"` //required
Weight float64 `param:"weight"` //required,it must be lager than 0
Enable bool `param:"enabled"` //required,the instance can be access or not
Healthy bool `param:"healthy"` //required,the instance is health or not
Metadata map[string]string `param:"metadata"` //optional
ClusterName string `param:"clusterName"` //optional,default:DEFAULT
ServiceName string `param:"serviceName"` //required
GroupName string `param:"groupName"` //optional,default:DEFAULT_GROUP
Ephemeral bool `param:"ephemeral"` //optional
}
注册的时候先生成了服务的实例信息和心跳信息,然后请求nacos服务进行注册
代码语言:javascript复制type Instance struct {
Valid bool `json:"valid"`
Marked bool `json:"marked"`
InstanceId string `json:"instanceId"`
Port uint64 `json:"port"`
Ip string `json:"ip"`
Weight float64 `json:"weight"`
Metadata map[string]string `json:"metadata"`
ClusterName string `json:"clusterName"`
ServiceName string `json:"serviceName"`
Enable bool `json:"enabled"`
Healthy bool `json:"healthy"`
Ephemeral bool `json:"ephemeral"`
}
代码语言:javascript复制type BeatInfo struct {
Ip string `json:"ip"`
Port uint64 `json:"port"`
Weight float64 `json:"weight"`
ServiceName string `json:"serviceName"`
Cluster string `json:"cluster"`
Metadata map[string]string `json:"metadata"`
Scheduled bool `json:"scheduled"`
Period time.Duration `json:"-"`
State int32 `json:"-"`
}
具体动作的执行是通过我们初始化naming客户端的时候指定的proxy agent执行的,默认的agent是一个httpagent
代码语言:javascript复制func (sc *NamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error)
_, err := sc.serviceProxy.RegisterInstance(util.GetGroupName(param.ServiceName, param.GroupName), param.GroupName, instance)
sc.beatReactor.AddBeatInfo(util.GetGroupName(param.ServiceName, param.GroupName), beatInfo)
其中注册实例是直接调用的我们前面提到的服务注册的http接口
代码语言:javascript复制return proxy.nacosServer.ReqApi(constant.SERVICE_PATH, params, http.MethodPost)
SERVICE_BASE_PATH = "/v1/ns"
SERVICE_PATH = SERVICE_BASE_PATH "/instance"
发送心跳是单独启用了一个协程
代码语言:javascript复制 go br.sendInstanceBeat(k, beatInfo)
如果当前实例注销,则进行停止心跳,否则进行心跳通信
代码语言:javascript复制beatInterval, err := br.serviceProxy.SendBeat(beatInfo)
api := constant.SERVICE_BASE_PATH "/instance/beat"
result, err := proxy.nacosServer.ReqApi(api, params, http.MethodPut)
具体调用的是
代码语言:javascript复制SERVICE_BASE_PATH = "/v1/ns"
result, err = server.callServer(api, params, method, getAddress(curServer), curServer.ContextPath)
最终调用的agent实现位于 github.com/nacos-group/nacos-sdk-go@v1.1.2/common/http_agent/http_agent.go
代码语言:javascript复制type HttpAgent struct {
}
代码语言:javascript复制func (agent *HttpAgent) Get
get(path, header, timeoutMs, params)
代码语言:javascript复制func (agent *HttpAgent) RequestOnlyResult
agent.Get
agent.Post
agent.Put
agent.Delete
bytes, errRead := ioutil.ReadAll(response.Body)
代码语言:javascript复制 func (agent *HttpAgent) Request
agent.Get
agent.Post
agent.Put
agent.Delete
其中get实现如下
代码语言:javascript复制func get(path string, header http.Header, timeoutMs uint64, params map[string]string) (response *http.Response, err error)
client := http.Client{}
resp, errDo := client.Do(request)
客户端采用随机策略选取一个实例
代码语言:javascript复制func (sc *NamingClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error)
service, err := sc.hostReactor.GetServiceInfo(util.GetGroupName(param.ServiceName, param.GroupName), strings.Join(param.Clusters, ","))
return serviceName constant.SERVICE_INFO_SPLITER clusters
其中
代码语言:javascript复制 SERVICE_INFO_SPLITER = "@@"
通过list方法获取服务列表
代码语言:javascript复制 cacheService, ok := hr.serviceInfoMap.Get(key)
hr.updateServiceNow(serviceName, clusters)
result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)
api := constant.SERVICE_PATH "/list"
return proxy.nacosServer.ReqApi(api, param, http.MethodGet)
然后解析json
代码语言:javascript复制 SERVICE_BASE_PATH = "/v1/ns"
SERVICE_PATH = SERVICE_BASE_PATH "/instance"
hr.ProcessServiceJson(result)
获取到实例列表后,就通过随机算法选取一个活着的节点
代码语言:javascript复制return sc.selectOneHealthyInstances(service)
for _, host := range hosts {
if host.Healthy && host.Enable && host.Weight > 0 {
cw := int(math.Ceil(host.Weight))
if cw > mw {
mw = cw
}
result = append(result, host)
chooser := newChooser(result)
instance := chooser.pick()
其中选择器定义:
代码语言:javascript复制sort.Sort(instance(instances))
return Chooser{data: instances, totals: totals, max: runningTotal}
选择算法实现:
代码语言:javascript复制instance := chooser.pick()
r := rand.Intn(chs.max) 1
i := sort.SearchInts(chs.totals, r)
return chs.data[i]