我们在前面介绍了 nsq 的相关概念以及 nsq 的安装与应用以及 nsqd 的实现原理、nsqlookupd 的实现细节。
本文将会介绍 nsq 在设计方面的一些思路。
设计概述
从源码可以看到,nsqd 的作用就是实际工作的组件,生产者 producer、消费者 consumer 利用 nsqlookupd 获取最新可用的节点,当连接上对应的 Topic/Channel 后,将消息 message 发送到客户端进行消费,处理成功则 FIN(finish),或失败/超时后重新放回队列 REQ(requeue),待下一次再消费处理。nsqlookupd 的作用就是管理 nsqd 节点的认证、注册、注销、心跳检测,动态维护分布式集群中最新可用的 nsqd 节点列表供客户端取用。
在可靠性、有序性方便, nsq 保证消息至少被投递消费一次(幂等消费),当某个 nsqd 节点出现故障时,极端情况下内存里面的消息还未来得及存入磁盘,这部分消息将丢失;通过分布式多个 consumer 消费,会因为消息处理时长、网络延迟等导致消息重排,再次消费顺序与写入顺序不一致,因此在高可靠性、顺序性方面略存在不足,应根据具体的业务场景进行取舍。
源代码实现逻辑清晰明了,源码中使用了很多读写锁 RWMutex、原子值 atomic.Value、interface 接口复用、自定义通信协议 protocol、http-decorator装饰器、goroutine/channel 协程间并发通信,优先从内存( msqChan )存取消息,从而保证了高可用、高吞吐量的应用能力。快速高效的节点配置与扩展,配合容器云编排技术,可以高效实现集群的 scale 化。
下面我们一起来看下其中实现的精巧之处。
锁与原子操作 RWMutex/atomic.Value
从下面的代码中可以看到,当需要获取一个 topic 的时候,先用读锁去读(此时如果有写锁将被阻塞),若存在则直接返回,若不存在则使用写锁新建一个;另外,使用 atomic.Value 进行结构体某些字段的并发存取值,保证原子性。
代码语言:javascript复制func (n *NSQD) GetTopic(topicName string) *Topic {
// most likely, we already have this topic, so try read lock first.
n.RLock()
t, ok := n.topicMap[topicName]
n.RUnlock()
if ok {
return t
}
n.Lock()
t, ok = n.topicMap[topicName]
if ok {
n.Unlock()
return t
}
deleteCallback := func(t *Topic) {
n.DeleteExistingTopic(t.name)
}
t = NewTopic(topicName, &context{n}, deleteCallback)
n.topicMap[topicName] = t
n.Unlock()
}
消息多路分发与负载均衡
Topic 和 Channel 都没有预先配置。Topic 由第一次发布消息到命名的 Topic 或第一次通过订阅一个命名 Topic 来创建。Channel 被第一次订阅到指定的 Channel 创建。Topic 和 Channel 的所有缓冲的数据相互独立,防止缓慢消费者造成对其他 Channel 的积压(同样适用于 Topic 级别)。
多路分发 - producer 会同时连上 nsq 集群中所有 nsqd 节点,当然这些节点的地址是在初始化时,通过外界传递进去;当发布消息时,producer 会随机选择一个 nsqd 节点发布某个 Topic 的消息;consumer 在订阅 subscribe 某个Topic/Channel时,会首先连上 nsqlookupd 获取最新可用的 nsqd 节点,然后通过 TCP 长连接方式连上所有发布了指定 Topic 的 producer 节点,并在本地用 tornado 轮询每个连接,当某个连接有可读事件时,即有消息达到,处理即可。
负载均衡 - 当向某个 Topic 发布一个消息时,该消息会被复制到所有的 Channel,如果 Channel 只有一个客户端,那么 Channel 就将消息投递给这个客户端;如果 Channel 的客户端不止一个,那么 Channel 将把消息随机投递给任何一个客户端,这也可以看做是客户端的负载均衡
小结
本文主要介绍 nsq 的部分功能设计思路。除了我所介绍的锁与原子操作、消息多路分发,还有诸如队列设计中的优先级队列以及延时队列等。将会在下一篇文章继续介绍其他设计实现的细节。