Codis能够分为clientJodis、代理中间件Codis Proxy、Zookeeper协调、监控界面、Redis定制版Codis Server等组件。这里第一部分主要关注最核心的Proxy部分的源代码。
2.1 程序入口main.go
codis-1.92/cmd/proxy/main.go是Proxy组件的main函数入口,完毕的主要工作就是设置日志级别、解析命令行參数(CPU核数、绑定地址等)、载入配置文件、Golang环境(runtime.GOMAXPROCS并发数)、启动Socket监听等常规任务。顺藤摸瓜,我们要分析的关键应该就在router中。
代码语言:javascript复制func main() {
// 1.打印banner,设置日志级别
fmt.Print(banner)
log.SetLevelByString("info")
// 2.解析命令行參数
args, err := docopt.Parse(usage, nil, true, "codis proxy v0.1", true)
if err != nil {
log.Error(err)
}
if args["-c"] != nil {
configFile = args["-c"].(string)
}
...
dumppath := utils.GetExecutorPath()
log.Info("dump file path:", dumppath)
log.CrashLog(path.Join(dumppath, "codis-proxy.dump"))
// 3.设置Golang并发数等
router.CheckUlimit(1024)
runtime.GOMAXPROCS(cpus)
// 4.启动Http监听
http.HandleFunc("/setloglevel", handleSetLogLevel)
go http.ListenAndServe(httpAddr, nil)
log.Info("running on ", addr)
conf, err := router.LoadConf(configFile)
if err != nil {
log.Fatal(err)
}
// 5.创建Server。启动Socket监听
s := router.NewServer(addr, httpAddr, conf)
s.Run()
log.Warning("exit")
}
2.2 核心类Server
打开codis-1.92/pkg/proxy/router/router.go,在分析请求接收和分发前,先来看一个最核心的类Server,它就是在main.go中调用router.NewServer()时创建的。说一下比較重要的几个字段:
- reqCh:Pipeline请求的Channel。
- pools:Slot与cachepool的map。
- evtbus/top:处理Zookeeper消息,更新拓扑结构。
- bufferedReq:Slot处于migrate期间被缓冲的请求。
- pipeConns:Slot相应的taskrunner。
注意interface{},它表示空interface,依照Golang的Duck Type继承方式。不论什么类都是空接口的子类。所以interface{}有点像C语言中的void*/char*。
由于Codis是先启动监听再開始接收Socket请求,所以对go s.handleTopoEvent()的分析放到后面。
在下一节我们先看一下Codis是怎样启动对Socket端口监听并将接收到的请求放入到Server的reqCh管道中的。
代码语言:javascript复制type Server struct {
slots [models.DEFAULT_SLOT_NUM]*Slot
top *topo.Topology
evtbus chan interface{}
reqCh chan *PipelineRequest
lastActionSeq int
pi models.ProxyInfo
startAt time.Time
addr string
moper *MultiOperator
pools *cachepool.CachePool
counter *stats.Counters
OnSuicide OnSuicideFun
bufferedReq *list.List
conf *Conf
pipeConns map[string]*taskRunner //redis->taskrunner
}
func NewServer(addr string, debugVarAddr string, conf *Conf) *Server {
log.Infof("start with configuration: % v", conf)
// 1.创建Server类
s := &Server{
conf: conf,
evtbus: make(chan interface{}, 1000),
top: topo.NewTopo(conf.productName, conf.zkAddr, conf.f, conf.provider),
counter: stats.NewCounters("router"),
lastActionSeq: -1,
startAt: time.Now(),
addr: addr,
moper: NewMultiOperator(addr),
reqCh: make(chan *PipelineRequest, 1000),
pools: cachepool.NewCachePool(),
pipeConns: make(map[string]*taskRunner),
bufferedReq: list.New(),
}
...
// 2.启动Zookeeper监听器
s.RegisterAndWait()
_, err = s.top.WatchChildren(models.GetWatchActionPath(conf.productName), s.evtbus)
if err != nil {
log.Fatal(errors.ErrorStack(err))
}
// 3.初始化全部Slot的信息
s.FillSlots()
// 4.启动对reqCh和evtbus中事件的监听
go s.handleTopoEvent()
return s
}
2.2.1 Zookeeper通信topology.go
NewServer()中调用的RegisterAndWait()和WatchChildren()都是处理Zookeeper的。
一部分代码在codis-1.92/pkg/proxy/router/topology/topology.go中,一部分底层实如今codis-1.92/pkg/models包下。
这里就不详细分析models包是怎样与Zookeeper通信的了,以免偏离了主题。现阶段。我们仅仅需知道Zookeeper中结点关系(Proxy拓扑结构)的变化都会反映到evtbus管道中即可了。
代码语言:javascript复制func (s *Server) RegisterAndWait() {
_, err := s.top.CreateProxyInfo(&s.pi)
if err != nil {
log.Fatal(errors.ErrorStack(err))
}
_, err = s.top.CreateProxyFenceNode(&s.pi)
if err != nil {
log.Warning(errors.ErrorStack(err))
}
s.registerSignal()
s.waitOnline()
}
func (top *Topology) WatchChildren(path string, evtbus chan interface{}) ([]string, error) {
content, _, evtch, err := top.zkConn.ChildrenW(path)
if err != nil {
return nil, errors.Trace(err)
}
// 启动监听器。监听Zookeeper事件
go top.doWatch(evtch, evtbus)
return content, nil
}
func (top *Topology) doWatch(evtch <-chan topo.Event, evtbus chan interface{}) {
e := <-evtch
if e.State == topo.StateExpired || e.Type == topo.EventNotWatching {
log.Fatalf("session expired: % v", e)
}
log.Warningf("topo event % v", e)
switch e.Type {
//case topo.EventNodeCreated:
//case topo.EventNodeDataChanged:
case topo.EventNodeChildrenChanged: //only care children changed
//todo:get changed node and decode event
default:
log.Warningf("% v", e)
}
// 将Zookeeper结点变化的事件放入Server的evtbus管道中
evtbus <- e
}
2.2.2 初始化槽信息fillSlots()
Codis将Redis服务器依照Group划分。每一个Group就是一个Master以及至少一个Slave。也就是说每一个Group都相应哈希散列的一个Slot。
fillSlots()从ZooKeeper中取出注冊的Redis后端信息。初始化每一个Slot(默认1024个):包含Slot状态、Group信息等。
代码语言:javascript复制func (s *Server) FillSlots() {
// 为全部默认1024个Slot初始化信息
for i := 0; i < models.DEFAULT_SLOT_NUM; i {
s.fillSlot(i, false)
}
}
func (s *Server) fillSlot(i int, force bool) {
s.clearSlot(i)
// 1.获得当前Slot的信息和Group信息
slotInfo, groupInfo, err := s.top.GetSlotByIndex(i)
slot := &Slot{
slotInfo: slotInfo,
dst: group.NewGroup(*groupInfo),
groupInfo: groupInfo,
}
// 2.创建Slot相应的cachepool
s.pools.AddPool(slot.dst.Master())
if slot.slotInfo.State.Status == models.SLOT_STATUS_MIGRATE {
//get migrate src group and fill it
from, err := s.top.GetGroup(slot.slotInfo.State.MigrateStatus.From)
if err != nil { //todo: retry ?
log.Fatal(err)
}
slot.migrateFrom = group.NewGroup(*from)
s.pools.AddPool(slot.migrateFrom.Master())
}
s.slots[i] = slot
s.counter.Add("FillSlot", 1)
}
codis-1.92/pkg/proxy/cachepool/cachepool.go和codis-1.92/pkg/proxy/redispool/redispool.go中负责创建与Redis通信的连接池。
代码语言:javascript复制type LivePool struct {
pool redispool.IPool
}
type CachePool struct {
mu sync.RWMutex
pools map[string]*LivePool
}
func (cp *CachePool) AddPool(key string) error {
// 1.锁住cachepool
cp.mu.Lock()
defer cp.mu.Unlock()
// 2.查找当前Slot的连接池
pool, ok := cp.pools[key]
if ok {
return nil
}
// 3.若不存在则新建LivePool
pool = &LivePool{
//pool: redispool.NewConnectionPool("redis conn pool", 50, 120*time.Second),
pool: NewSimpleConnectionPool(),
}
// 4.打开连接
pool.pool.Open(redispool.ConnectionCreator(key))
// 5.保存新建好的连接池
cp.pools[key] = pool
return nil
}
2.3 请求接收router.go(1)
以下继续跟踪主流程,main()方法在调用NewServer()创建出Server实例后。调用了其Run()方法。
Run()是标准的服务端代码。首先net.Listen()绑定到端口上监听,然后进入死循环Accept(),每接收到一个连接就启动一个goroutine进行处理。
代码语言:javascript复制func (s *Server) Run() {
log.Infof("listening %s on %s", s.conf.proto, s.addr)
listener, err := net.Listen(s.conf.proto, s.addr)
...
for {
conn, err := listener.Accept()
if err != nil {
log.Warning(errors.ErrorStack(err))
continue
}
go s.handleConn(conn)
}
}
handleConn()接收到client的连接,完毕三件事儿:
- 创建session对象:保存当前client的Socket连接、读写缓冲区、响应Channel等。
- 启动响应goroutine:client.WritingLoop()中处理backQ中的响应数据。
- 建立Redis连接:server.redisTunnel()中打开连接。读取client请求并转发给Redis处理。
func (s *Server) handleConn(c net.Conn) {
log.Info("new connection", c.RemoteAddr())
s.counter.Add("connections", 1)
// 1.创建当前client的Session实例
client := &session{
Conn: c,
r: bufio.NewReaderSize(c, 32*1024),
w: bufio.NewWriterSize(c, 32*1024),
CreateAt: time.Now(),
backQ: make(chan *PipelineResponse, 1000),
closeSignal: &sync.WaitGroup{},
}
client.closeSignal.Add(1)
// 2.启动监视backQ写回响应的子routine
go client.WritingLoop()
...
// 3.循环读取该client的请求并处理
for {
err = s.redisTunnel(client)
if err != nil {
close(client.backQ)
return
}
client.Ops
}
}
redisTunnel能够说是Proxy服务端的“代码中枢”了,最核心的代码都是在这里共同协作完毕任务的,它调用三个最为关键的函数:
- getRespOpKeys()解析请求:在helper.go中。托付parser.go解析client请求。 此处对多參数的请求比如hmset进行特殊处理,由于key可能相应多个后端Redis实例。假设是单參数,则能够Pipeline化发送给后端。
- mapKey2Slot()哈希映射:在mapper.go中。计算key应该分配到哪台Redis服务器的Slot中。
- PipelineRequest()创建Pipeline请求:依据前面得到的数据新建PipelineRequest。并发送到当前clientSession中的Channel中。之后调用pr.wg.Wait(),当前go s.handleConn()创建的goroutine休眠等待响应。
func (s *Server) redisTunnel(c *session) error {
resp, op, keys, err := getRespOpKeys(c)
k := keys[0]
...
if isMulOp(opstr) {
if len(keys) > 1 { //can not send to redis directly
var result []byte
err := s.moper.handleMultiOp(opstr, keys, &result)
if err != nil {
return errors.Trace(err)
}
s.sendBack(c, op, keys, resp, result)
return nil
}
}
i := mapKey2Slot(k)
//pipeline
c.pipelineSeq
pr := &PipelineRequest{
slotIdx: i,
op: op,
keys: keys,
seq: c.pipelineSeq,
backQ: c.backQ,
req: resp,
wg: &sync.WaitGroup{},
}
pr.wg.Add(1)
s.reqCh <- pr
pr.wg.Wait()
return nil
}
2.3.1 RESP协议解析parser.go
redisTunnel()调用了helper.go中的getRespOpKeys(),后者使用parser.go解析RESP协议请求,从Parse()函数的代码中能清晰地看到对RESP五种通信格式’-‘,’ ’,’:’,’$’。’*’。由于要依据请求中的命令和key做路由,以及特殊处理(比如多參数命令)。所以Codis不能简单地透传,而是解析协议获得所需的信息。
注意parser.Parse()的使用方法。这里parser是包名不是一个对象实例,而Parse是parser包中的一个public函数。所以乍看之下有点困惑了,这也是Golang支持既像C一样面向过程编程,又有高级语言的面向对象甚至Duck Type的缘故。
Parse()读取网络流。并递归处理整个请求。比如”GET ab”命令:
代码语言:javascript复制 *2rn
$3rn
GETrn
$2rn
abrn
终于Parse()返回时得到:
代码语言:javascript复制 Resp{
Raw: "*2rn",
Multi{
Resp{ Raw: "$3rnGETrn" },
Resp{ Raw: "$2rnabrn" }
}
}
假设仔细分析的话,readLine()中使用readSlice()读取缓冲区的切片,节约了内存。
这样的设计上的小细节还是非常值得关注和学习的。毕竟“天下大事,必作于细”。
代码语言:javascript复制func getRespOpKeys(c *session) (*parser.Resp, []byte, [][]byte, error) {
resp, err := parser.Parse(c.r) // read client request
op, keys, err := resp.GetOpKeys()
...
return resp, op, keys, nil
}
type Resp struct {
Type int
Raw []byte
Multi []*Resp
}
func Parse(r *bufio.Reader) (*Resp, error) {
line, err := readLine(r)
if err != nil {
return nil, errors.Trace(err)
}
resp := &Resp{}
if line[0] == '$' || line[0] == '*' {
resp.Raw = make([]byte, 0, len(line) 64)
} else {
resp.Raw = make([]byte, 0, len(line))
}
resp.Raw = append(resp.Raw, line...)
switch line[0] {
case '-':
resp.Type = ErrorResp
return resp, nil
case ' ':
resp.Type = SimpleString
return resp, nil
case ':':
resp.Type = IntegerResp
return resp, nil
case '$':
resp.Type = BulkResp
...
case '*':
resp.Type = MultiResp
...
}
2.3.2 哈希映射mapper.go
mapKey2Slot()处理HashTag,并使用CRC32计算哈希值。
代码语言:javascript复制const (
HASHTAG_START = '{'
HASHTAG_END = '}'
)
func mapKey2Slot(key []byte) int {
hashKey := key
//hash tag support
htagStart := bytes.IndexByte(key, HASHTAG_START)
if htagStart >= 0 {
htagEnd := bytes.IndexByte(key[htagStart:], HASHTAG_END)
if htagEnd >= 0 {
hashKey = key[htagStart 1 : htagStart htagEnd]
}
}
return int(crc32.ChecksumIEEE(hashKey) % models.DEFAULT_SLOT_NUM)
}
2.4 请求分发router.go(2)
NewServer()中运行go s.handleTopoEvent()启动goroutine。对Server数据结构中的reqCh和evtbus两个Channel进行事件监听处理。这里重点看拿到reqCh的事件后是怎样dispatch()的。reqCh的事件也就是PipelineRequest,会经dispath()函数放入相应Slot的taskrunner的in管道中。也就是说,reqCh中的请求会被分发到各个Slot自己的Channel中。
另外注意:此处会检查PipelineRequest相应Slot的状态。假设正在migrate,则临时将请求缓冲到Server类的bufferedReq链表中。
代码语言:javascript复制func (s *Server) handleTopoEvent() {
for {
select {
// 1.处理Server.reqCh中事件
case r := <-s.reqCh:
// 1.1 假设正在migrate。则将请求r临时缓冲起来
if s.slots[r.slotIdx].slotInfo.State.Status == models.SLOT_STATUS_PRE_MIGRATE {
s.bufferedReq.PushBack(r)
continue
}
// 1.2 处理缓冲中的请求e
for e := s.bufferedReq.Front(); e != nil; {
next := e.Next()
s.dispatch(e.Value.(*PipelineRequest))
s.bufferedReq.Remove(e)
e = next
}
// 1.3 处理当前请求r
s.dispatch(r)
// 2.处理Server.evtbus中请求
case e := <-s.evtbus:
switch e.(type) {
case *killEvent:
s.handleMarkOffline()
e.(*killEvent).done <- nil
default:
evtPath := GetEventPath(e)
...
s.processAction(e)
}
}
}
}
func (s *Server) dispatch(r *PipelineRequest) {
s.handleMigrateState(r.slotIdx, r.keys[0])
// 1.查找Slot相应的taskrunner
tr, ok := s.pipeConns[s.slots[r.slotIdx].dst.Master()]
// 2.若没有。则新建一个taskrunner
if !ok {
// 2.1 新建tr时出错。则向r.backQ放入一个空响应
if err := s.createTaskRunner(s.slots[r.slotIdx]); err != nil {
r.backQ <- &PipelineResponse{ctx: r, resp: nil, err: err}
return
}
// 2.2 拿到taskrunner
tr = s.pipeConns[s.slots[r.slotIdx].dst.Master()]
}
// 3.将请求r放入in管道
tr.in <- r
}
taskrunner.go的createTaskRunner()调用NewTaskRunner()创建当前Slot相应的taskrunner。每一个taskrunner都拥有一对in和out管道。之前的PipelineRequest就是放到in管道中。然后启动了两个goroutine,分别调用writeloop()和readloop()函数监听in和out管道,处理当中的请求。
代码语言:javascript复制func (s *Server) createTaskRunner(slot *Slot) error {
dst := slot.dst.Master()
if _, ok := s.pipeConns[dst]; !ok {
tr, err := NewTaskRunner(dst, s.conf.netTimeout)
if err != nil {
return errors.Errorf("create task runner failed, %v, % v, % v", err, slot.dst, slot.slotInfo)
} else {
s.pipeConns[dst] = tr
}
}
return nil
}
func NewTaskRunner(addr string, netTimeout int) (*taskRunner, error) {
// 1.创建TaskRunner实例
tr := &taskRunner{
in: make(chan interface{}, 1000),
out: make(chan interface{}, 1000),
redisAddr: addr,
tasks: list.New(),
netTimeout: netTimeout,
}
// 2.创建Redis连接,并绑定到tr
c, err := redisconn.NewConnection(addr, netTimeout)
tr.c = c
// 3.開始监听读写管道in和out
go tr.writeloop()
go tr.readloop()
return tr, nil
}
func (tr *taskRunner) writeloop() {
var err error
tick := time.Tick(2 * time.Second)
for {
...
select {
// 1.处理in管道中来自client的请求
case t := <-tr.in:
tr.processTask(t)
// 2.处理out管道中来自Redis的响应
case resp := <-tr.out:
err = tr.handleResponse(resp)
// 设置select间隔
case <-tick:
if tr.tasks.Len() > 0 && int(time.Since(tr.latest).Seconds()) > tr.netTimeout {
tr.c.Close()
}
}
}
}
2.5 请求发送taskrunner.go
终于到了请求的生命周期的最后一个环节了!writeloop()会不断调用processTask()处理in管道中的请求,通过dowrite()函数发送到Redis服务端。当in管道中没有其它请求时,会强制刷新一下缓冲区。
代码语言:javascript复制func (tr *taskRunner) processTask(t interface{}) {
var err error
switch t.(type) {
case *PipelineRequest:
r := t.(*PipelineRequest)
var flush bool
if len(tr.in) == 0 { //force flush
flush = true
}
err = tr.handleTask(r, flush)
case *sync.WaitGroup: //close taskrunner
err = tr.handleTask(nil, true) //flush
...
}
...
}
func (tr *taskRunner) handleTask(r *PipelineRequest, flush bool) error {
if r == nil && flush { //just flush
return tr.c.Flush()
}
// 1.将请求保存到链表,接收到响应时再移除
tr.tasks.PushBack(r)
tr.latest = time.Now()
// 2.发送请求到Redis
return errors.Trace(tr.dowrite(r, flush))
}
type Resp struct {
Type int
Raw []byte
Multi []*Resp
}
func (tr *taskRunner) dowrite(r *PipelineRequest, flush bool) error {
// 1.通过Bytes()函数取出Resp中的原始字节Raw
b, err := r.req.Bytes()
...
// 2.将原始请求发送到Redis服务端
_, err = tr.c.Write(b)
...
// 3.假设须要,强制刷新缓冲区
if flush {
return errors.Trace(tr.c.Flush())
}
return nil
}
Codis使用Golang的bufio库处理底层的IO流读写操作。
在NewConnection()中,用net包创建到Redis的Socket连接,并分别创建大小为512K的读写缓冲流。
代码语言:javascript复制//not thread-safe
type Conn struct {
addr string
net.Conn
closed bool
r *bufio.Reader
w *bufio.Writer
netTimeout int //second
}
func NewConnection(addr string, netTimeout int) (*Conn, error) {
// 1.打开到Redis服务端的TCP连接
conn, err := net.DialTimeout("tcp", addr, time.Duration(netTimeout)*time.Second)
...
// 2.创建Conn实例。及读写缓冲区
return &Conn{
addr: addr,
Conn: conn,
r: bufio.NewReaderSize(conn, 512*1024),
w: bufio.NewWriterSize(deadline.NewDeadlineWriter(conn, time.Duration(netTimeout)*time.Second), 512*1024),
netTimeout: netTimeout,
}, nil
}
func (c *Conn) Flush() error {
return c.w.Flush()
}
func (c *Conn) Write(p []byte) (int, error) {
return c.w.Write(p)
}
2.6 返回响应session.go
当writeloop()在“如火如荼”地向Redis发送请求时。readloop()也没有闲着。它不断地从Redis读取响应。发送每一个请求时都不会等待Redis的响应。也就是说发送请求和读取响应全然是异步进行的。所以就充分利用了Pipeline的性能优势。
代码语言:javascript复制func (tr *taskRunner) readloop() {
for {
// 1.从Redis连接中读取响应
resp, err := parser.Parse(tr.c.BufioReader())
if err != nil {
tr.out <- err
return
}
// 2.将解析好的响应放入out管道中
tr.out <- resp
}
}
func (tr *taskRunner) handleResponse(e interface{}) error {
switch e.(type) {
...
case *parser.Resp:
// 1.取到out管道中的PipelineResponse
resp := e.(*parser.Resp)
// 2.取出相应的PipelineRequest
e := tr.tasks.Front()
req := e.Value.(*PipelineRequest)
// 3.将响应放入到backQ管道中(req.backQ也就是session中的backQ)
req.backQ <- &PipelineResponse{ctx: req, resp: resp, err: nil}
// 4.从任务列表中移除已拿到响应的请求
tr.tasks.Remove(e)
return nil
}
return nil
}
由于writeloop()不仅监视in管道,也监视out管道。所以writeloop()会将readloop()放入的响应交给handleResponse()处理。终于PipelineResponse被放入Session对象的backQ管道中。还记得它吗?在最開始NewServer时为当前client创建的Session实例。最后,接收到的PipleResponse会转成RESP协议的字节序列,发送回client。
代码语言:javascript复制func (s *session) WritingLoop() {
s.lastUnsentResponseSeq = 1
for {
select {
case resp, ok := <-s.backQ:
if !ok {
s.Close()
s.closeSignal.Done()
return
}
flush, err := s.handleResponse(resp)
...
}
}
}
func (s *session) handleResponse(resp *PipelineResponse) (flush bool, err error) {
...
if !s.closed {
if err := s.writeResp(resp); err != nil {
return false, errors.Trace(err)
}
flush = true
}
return
}
func (s *session) writeResp(resp *PipelineResponse) error {
// 1.取出Resp中的原始字节
buf, err := resp.resp.Bytes()
if err != nil {
return errors.Trace(err)
}
// 2.写回到client
_, err = s.Write(buf)
return errors.Trace(err)
}
//write without bufio
func (s *session) Write(p []byte) (int, error) {
return s.w.Write(p)
}
2.7 Proxy源代码流程总结
最后以一张Proxy的流程图作结束。经过我们的分析能够看出,关于并发安全方面。Codis唯一须要并发控制的地方就是从reqCh分发到各个Slot的Channel,为了避免竞争,这一部分是由一个goroutine完毕的。