1. 简介#
channel也叫通道,类似于一个队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。channel一般分为无缓存通道和有缓存通道,无缓存通道指缓存为0的channel,有缓存通道指缓存大于0的channel
如下是无缓存通道的示例:
代码语言:javascript复制func TestChannelNoBuffer(t *testing.T) {
ch1 := make(chan string) // 初始化一个缓存为0的通道
go func() {
val1 := <-ch1
fmt.Println(val1)
}()
// 如果没有用goroutine去接收通道内的值,这一步将会阻塞
// 所以goroutine需要写在阻塞这一步的前面
ch1 <- "value"
// Output
// value
}
如下是有缓存通道的示例:
代码语言:javascript复制func TestWithBuffer(t *testing.T) {
ch2 := make(chan string, 2) // 初始化一个缓存为2的通道
for i := 0; i < 2; i {
ch2 <- "value " strconv.Itoa(i)
}
for i := 0; i < 2; i {
temp := <-ch2
fmt.Println(temp)
}
// Output
// value 1
// value 2
}
2. channel实现细节#
先来看一下channel的结构体
代码语言:javascript复制type hchan struct {
qcount uint // channel中元素个数
dataqsiz uint // channel中循环队列的大小
buf unsafe.Pointer // 指向datasiz元素的数组
elemsize uint16
closed uint32
elemtype *_type // 元素类型
sendx uint // 发送操作的下标
recvx uint // 接收操作的下标
recvq waitq // 接收的时候,如果channel缓冲区为空,也没发送者,就把goroutine放到这个链表
sendq waitq // 发送的时候,如果channel缓冲区满了,也没接收者,就把goroutine放到这个链表
lock mutex
}
type waitq struct { // 双向链表
first *sudog
last *sudog
}
2.1 make初始化channel#
make用于初始化channel,如下是make的源码主体部分
代码语言:javascript复制func makechan(t *chantype, size int) *hchan {
elem := t.elem
......
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0: // 当创建无缓冲channel时,只会为chan分配一段内存空间
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0: // 当传入的chan类型不是指针类型时,会为chan和buf一次性分配一块连续的内存空间
c = (*hchan)(mallocgc(hchanSize mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 否则会为chan和buf新创建一块内存空间
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 统一更新如下字段
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
......
return c
}
初始化channel一定要用make的方式,不然channel会处于nil的状态,如这种创建方式就会导致channel处于nil状态:var ch chan int
2.2 向chan发送数据 chan <- i#
下图是channel常见的异常总结,对于理解channel源码有一定帮助
在此之前先看该源码的大致逻辑可以更加轻松的理解
如下源码是向channel发送数据时调用的主体部分
代码语言:javascript复制func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil { // 如上图,如果向为nil的channel发送数据,则会阻塞
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
......
lock(&c.lock)
if c.closed != 0 { // 向已经关闭的channel发送数据会panic,
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 如果存在阻塞的接收者,则直接把值发送给它,绕过发送通道缓冲区
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 如果前面没有发现阻塞的接收者,则判断channel的缓冲区是否满了,
// 如果没有满,则把发送者放到发送通道缓冲区
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount
unlock(&c.lock)
return true
}
......
// 若上面的缓冲区也满了,则会丢给sendq并阻塞,等待接收者唤醒自己
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep) // 确保值不会被gc掉
// someone woke us up
// 被唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
......
}
2.3 从chan接收数据 i := <- chan#
从chan接收数据即把channel里面的值取出来,如下是源码部分,先来看一张逻辑图,便于理解
如下是源码部分
代码语言:javascript复制func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......
if c == nil { // 如果channel为nil,则会阻塞
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
......
// 如果存在阻塞的发送者,则直接从这个发送者接收数据
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 如果接收通道缓冲区没有放满,则把这个接收者放到接收通道缓冲区
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
......
// 如果缓冲区也放满了,则会丢到recvq并阻塞,等待有发送者把自己唤醒
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
// 被唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
......
}
这里有一个点要注意一下,从已关闭的channel取数据是不会panic的,可以正常取值,如果取完了channel里面的值且channel已关闭,仍然取值的话,就会返回类型相应的默认值,如下例:
代码语言:javascript复制func TestChannelNoBuffer(t *testing.T) {
ch1 := make(chan int)
go func() {
time.Sleep(time.Second)
val1 := <-ch1
fmt.Println(val1)
val1 = <-ch1
fmt.Println(val1)
val1 = <-ch1
fmt.Println(val1)
}()
// 如果没有用goroutine去接收通道内的值,这一步将会阻塞
// 所以goroutine需要写在阻塞这一步的前面
ch1 <- 10
close(ch1)
// Output
// 10
// 0
// 0
}
如果从没有关闭的channel取值,当里面的值取完了仍然取值的话,是取不到值的,会造成另外两个goroutine泄漏,如下例:
代码语言:javascript复制func TestChannelNoBuffer(t *testing.T) {
ch1 := make(chan int)
go func() {
time.Sleep(time.Second)
val1 := <-ch1
fmt.Println(val1)
}()
go func() {
time.Sleep(time.Second)
val1 := <-ch1
fmt.Println(val1)
}()
go func() {
time.Sleep(time.Second)
val1 := <-ch1
fmt.Println(val1)
}()
// 如果没有用goroutine去接收通道内的值,这一步将会阻塞
// 所以goroutine需要写在阻塞这一步的前面
ch1 <- 10
//close(ch1)
// Output
// 10
}
2.4 close关闭channel#
如下是channel的关闭操作的源码
代码语言:javascript复制func closechan(c *hchan) {
if c == nil { // 关闭nil的channel会panic
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 { // 关闭已经关闭的channel会panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
......
// 把所有接收队列里阻塞的等待者出队,并放入glist
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 把所有发送队列里阻塞的等待者出队,并放入glist
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// 如果glist不为空,则为这些被阻塞的goroutine调用goready函数唤醒这些goroutine
// 并重新对这些goroutine进行调度
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
3. 生产者-消费者模型#
如下是一个简单的生产者消费者模型
代码语言:javascript复制var wg sync.WaitGroup
func producer(data chan<- int) {
for i := 0; i < 4; i {
data <- i
}
// 这里记得要关闭channel,不然会发生阻塞,因为消费者的数量没有限制,
// 当消费者从空的channel取值的时候会阻塞
close(data)
}
func consumer(data <-chan int) {
defer wg.Done()
for {
v, ok := <-data
if !ok {
break
}
fmt.Println("---:", v, " ===:", ok)
}
}
func main() {
data := make(chan int)
go producer(data)
wg.Add(1)
go consumer(data)
wg.Wait()
}
// Output
// ---: 0 ===: true
// ---: 1 ===: true
// ---: 2 ===: true
// ---: 3 ===: true
4. channel模拟实现消息队列#
由于channel所具有的队列特性,因此可以尝试使用它来模拟实现一个消息队列。需要声明channel具有如下一些特性:
- channel里面的每个数据只能被一个goroutine消费,拿走了就没有了
- channel一般是按照先进先出的方式消费,无法随机消费
关于消息队列的介绍,可以参考我的这篇文章: Title 消息队列学习 | 基础
其实普通的channel使用就是消息队列的队列模型,一边发送,另一边接收,顺序是FIFO,队列模型是早期的消息队列使用的数据结构,本身比较简单,就不做模拟实现,现在来看看如何实现一些其他的模型
3.1 发布订阅模型#
发布订阅模型的基本逻辑如下图:
生产者往Broker里发送消息,然后供多个消费者订阅,此处没有深入到Broker的细节如Topic、Partition等,只是使用channel简单模拟消息队列中生产者和消费者的关系。此处模拟一个消费者发送消息,有多个消费者消费的场景
代码语言:javascript复制type Broker struct {
consumers []*Consumer
}
type Consumer struct {
ch chan string
}
func (b *Broker) produce(msg string) {
// 轮询给消费者发送消息
for _, v := range b.consumers {
v.ch <- msg
}
}
func (b *Broker) subscribe(consumer *Consumer) {
b.consumers = append(b.consumers, consumer)
}
func TestMq1(t *testing.T) {
// 初始化一个Broker节点
b := &Broker{
consumers: make([]*Consumer, 0, 4),
}
// 创建2个消费者
consumer1 := &Consumer{
ch: make(chan string, 1),
}
consumer2 := &Consumer{
ch: make(chan string, 1),
}
// 这2个消费者订阅Broker
b.subscribe(consumer1)
b.subscribe(consumer2)
// 生产者发送一个消息
b.produce("一条消息")
// 2个消费者拿到了刚才生产者发送的消息
fmt.Println(<-consumer1.ch)
fmt.Println(<-consumer2.ch)
// Output
// 一条消息
// 一条消息
}
5. 参考链接#
极客时间go实战训练营
https://www.topgoer.com/并发编程/channel.html
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#64-channel