channel

2023-11-30 23:26:19 浏览数 (3)

0. channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的 goroutine 中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go 语言的并发模型是 CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。

如果说 goroutine 是 Go 程序并发的执行体,channel 就是它们之间的连接。channel 是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明 channel 的时候需要为其指定元素类型。

1. channel 类型

channel 是一种类型,一种引用类型。声明通道类型的格式如下:

代码语言:txt复制
 var 变量 chan 元素类型

举几个例子:

代码语言:go复制
var ch1 chan int   
var ch2 chan bool  
var ch3 chan []int

2. 创建 channel

通道是引用类型,通道类型的空值是 nil。

代码语言:txt复制
var ch chan int
fmt.Println(ch)

声明的通道后需要使用 make 函数初始化之后才能使用。

创建 channel 的格式如下:

代码语言:txt复制
 make(chan 元素类型, [缓冲大小])

channel 的缓冲大小是可选的。

举几个例子:

代码语言:txt复制
ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)

如果使用 channel 之前没有 make,会出现 dead lock 错误。

代码语言:go复制
func main() {
    var x chan int
    go func() {
        x <- 1
    }()
    <-x
}

1.1.4. channel 操作

通道有发送(send)、接收 (receive)和关闭(close)三种操作。

发送和接收都使用 <- 符号。

现在我们先使用以下语句定义一个通道:

代码语言:txt复制
ch := make(chan int)
发送

将一个值发送到通道中。

代码语言:txt复制
ch <- 10
接收

从一个通道中接收值。

代码语言:txt复制
x := <- ch 
<-ch
关闭

我们通过调用内置的 close 函数来关闭通道。

代码语言:txt复制
 close(ch)

关于关闭通道需要注意的事情是,只有在通知接收方 goroutine 所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

代码语言:go复制
对一个关闭的通道再发送值就会导致panic。
对一个关闭的通道进行接收会一直获取值直到通道为空。
对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
关闭一个已经关闭的通道会导致panic。

同步模式下,必须要使发送方和接收方配对,操作才会成功,否则会被阻塞;异步模式下,缓冲槽要有剩余容量,操作才会成功,否则也会被阻塞。

1.1.5. 无缓冲的通道

无缓冲的通道又称为阻塞的通道。我们来看一下下面的代码:

代码语言:txt复制
func main() {
    ch := make(chan int)
    ch <- 10
    fmt.Println("发送成功")
}

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

代码语言:txt复制
 fatal error: all goroutines are asleep - deadlock!

    goroutine 1 [chan send]:
    main.main()
            .../src/github.com/pprof/studygo/day06/channel02/main.go:8  0x54

为什么会出现 deadlock 错误呢?

因为我们使用 ch := make(chan int) 创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值。就像你住的小区没有快递柜和代收点,快递员给你打电话必须要把这个物品送到你的手中,简单来说就是无缓冲的通道必须有接收才能发送。

上面的代码会阻塞在 ch <- 10 这一行代码形成死锁,那如何解决这个问题呢?

一种方法是启用一个 goroutine 去接收值,例如:

代码语言:txt复制
func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}
func main() {
    ch := make(chan int)
    go recv(ch) 
    ch <- 10
    fmt.Println("发送成功")
}

无缓冲通道上的发送操作会阻塞,直到另一个 goroutine 在该通道上执行接收操作,这时值才能发送成功,两个 goroutine 将继续执行。相反,如果接收操作先执行,接收方的 goroutine 将阻塞,直到另一个 goroutine 在该通道上发送一个值。

使用无缓冲通道进行通信将导致发送和接收的 goroutine 同步化。因此,无缓冲通道也被称为同步通道。

序中必须同时有不同的 goroutine 对非缓冲通道进行发送和接收操作,否则会造成阻塞。

1.1.6. 有缓冲的通道

解决上面问题的方法还有一种就是使用有缓冲区的通道。

我们可以在使用 make 函数初始化通道的时候为其指定通道的容量,例如:

代码语言:txt复制
func main() {
    ch := make(chan int, 1) 
    ch <- 10
    fmt.Println("发送成功")
}

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。

我们可以使用内置的 len 函数获取通道内元素的数量,使用 cap 函数获取通道的容量,虽然我们很少会这么做。

一个有缓冲 channel 具备以下特点:

  1. 有缓冲 channel 的内部有一个缓冲队列;
  2. 发送操作是向队列的尾部插入元素,如果队列已满,则阻塞等待,直到另一个 goroutine 执行,接收操作释放队列的空间;
  3. 接收操作是从队列的头部获取元素并把它从队列中删除,如果队列为空,则阻塞等待,直到另一个 goroutine 执行,发送操作插入新的元素。

1.1.7. close()

可以通过内置的 close() 函数关闭 channel(如果你的管道不往里存值或者取值的时候一定记得关闭管道)

代码语言:txt复制
package main

import "fmt"

func main() {
    c := make(chan int)
    go func() {
        for i := 0; i < 5; i   {
            c <- i
        }
        close(c)
    }()
    for {
        if data, ok := <-c; ok {
            fmt.Println(data)
        } else {
            break
        }
    }
    fmt.Println("main结束")
}
  1. 关闭一个 closed channel 会导致 panic
  2. 向一个 closed channel 发送数据会导致 panic

有一条广泛流传的关闭 channel 的原则:

don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.

不要从一个 receiver 侧关闭 channel,也不要在有多个 sender 时,关闭 channel。

到底应该如何优雅地关闭 channel?

根据 sender 和 receiver 的个数,分下面几种情况:

  1. 一个 sender,一个 receiver
  2. 一个 sender, M 个 receiver
  3. N 个 sender,一个 reciver
  4. N 个 sender, M 个 receiver

对于 1,2,只有一个 sender 的情况就不用说了,直接从 sender 端关闭就好了

针对第三种情况

解决方案就是增加一个传递关闭信号的 channel,receiver 通过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,停止发送数据。代码如下:

代码语言:go复制
func main() {
	rand.Seed(time.Now().UnixNano())

	const Max = 100000
	const NumSenders = 1000

	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})

	// senders
	for i := 0; i < NumSenders; i   {
		go func() {
			for {
				select {
				case <-stopCh:
					return
				case dataCh <- rand.Intn(Max):
				}
			}
		}()
	}

	// the receiver
	go func() {
		for value := range dataCh {
			if value == Max-1 {
				fmt.Println("send stop signal to senders.")
				close(stopCh)
				return
			}

			fmt.Println(value)
		}
	}()

	select {
	case <-time.After(time.Hour):
	}
}

这里的 stopCh 就是信号 channel,它本身只有一个 sender,因此可以直接关闭它。senders 收到了关闭信号后,select 分支 “case <- stopCh” 被选中,退出函数,不再发送数据。

在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 gc 回收。所以,在这种情形下,所谓的优雅地关闭 channel 就是不关闭 channel,让 gc 代劳。

针对第四种情况

需要增加一个中间人,M 个 receiver 都向它发送关闭 dataCh 的“请求”,中间人收到第一个请求后,就会直接下达关闭 dataCh 的指令(通过关闭 stopCh,这时就不会发生重复关闭的情况,因为 stopCh 的发送方只有中间人一个)。另外,这里的 N 个 sender 也可以向中间人发送关闭 dataCh 的请求。

代码语言:go复制
func main() {
	rand.Seed(time.Now().UnixNano())

	const Max = 100000
	const NumReceivers = 10
	const NumSenders = 1000

	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})

	// It must be a buffered channel.
	toStop := make(chan string, 1)

	var stoppedBy string

	// moderator
	go func() {
		stoppedBy = <-toStop
		close(stopCh)
	}()

	// senders
	for i := 0; i < NumSenders; i   {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				if value == 0 {
					select {
					case toStop <- "sender#"   id:
					default:
					}
					return
				}

				select {
				case <- stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// receivers
	for i := 0; i < NumReceivers; i   {
		go func(id string) {
			for {
				select {
				case <- stopCh:
					return
				case value := <-dataCh:
					if value == Max-1 {
						select {
						case toStop <- "receiver#"   id:
						default:
						}
						return
					}

					fmt.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}

	select {
	case <- time.After(time.Hour):
	}

}

代码里 toStop 就是中间人的角色,使用它来接收 senders 和 receivers 发送过来的关闭 dataCh 请求。

这里将 toStop 声明成了一个 缓冲型的 channel。假设 toStop 声明的是一个非缓冲型的 channel,那么第一个发送的关闭 dataCh 请求可能会丢失。因为无论是 sender 还是 receiver 都是通过 select 语句来发送请求,如果中间人所在的 goroutine 没有准备好,那 select 语句就不会选中,直接走 default 选项,什么也不做。这样,第一个关闭 dataCh 的请求就会丢失。

1.1.8. 如何优雅的从通道循环取值

当通过通道发送有限的数据时,我们可以通过 close 函数关闭通道来告知从该通道接收值的 goroutine 停止等待。当通道被关闭时,往该通道发送值会引发 panic,从该通道里接收的值一直都是类型零值。那如何判断一个通道是否被关闭了呢?

我们来看下面这个例子:

代码语言:txt复制
 func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    
    go func() {
        for i := 0; i < 100; i   {
            ch1 <- i
        }
        close(ch1)
    }()
    
    go func() {
        for {
            i, ok := <-ch1 
            if !ok {
                break
            }
            ch2 <- i * i
        }
        close(ch2)
    }()
    
    for i := range ch2 { 
        fmt.Println(i)
    }
}

从上面的例子中我们看到有两种方式在接收值的时候判断通道是否被关闭,我们通常使用的是 for range 的方式。

1.1.9. 单向通道

有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如限制通道在函数中只能发送或只能接收。

Go 语言中提供了单向通道来处理这种情况。例如,我们把上面的例子改造如下:

代码语言:txt复制
func counter(out chan<- int) {
    for i := 0; i < 100; i   {
        out <- i
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for i := range in {
        out <- i * i
    }
    close(out)
}
func printer(in <-chan int) {
    for i := range in {
        fmt.Println(i)
    }
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2, ch1)
    printer(ch2)
}

其中,

代码语言:go复制
 1.chan<- int是一个只能发送的通道,可以发送但是不能接收;
 2.<-chan int是一个只能接收的通道,可以接收但是不能发送。

在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。

1.1.10.channel 退出

代码语言:go复制
func main() {
	var wg sync.WaitGroup
	wg.Add(1)
	stopCh := make(chan bool)

	go func() {
		defer wg.Done()
		watch(stopCh, "watch")
	}()

	time.Sleep(5 * time.Second)
	stopCh <- true
	wg.Wait()
}

func watch(stopCh chan bool, name string) {

	for {
		select {
		case <-stopCh:
			fmt.Println(name, "copy that, stop")
			return
		default:
			fmt.Println(name, "watching……")
		}
		time.Sleep(1 * time.Second)
	}
}

1.1.11. 优雅的方式

  • 情形一:M个接收者和一个发送者,发送者通过关闭用来传输数据的通道来传递发送结束信号。
  • 情形二:一个接收者和N个发送者,此唯一接收者通过关闭一个额外的信号通道来通知发送者不要再发送数据了。
代码语言:go复制
func main() {
    rand.Seed(time.Now().UnixNano())

    const Max = 100000
    const NumSenders = 1000

    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})

    // senders
    for i := 0; i < NumSenders; i   {
        go func() {
            for {
                select {
                case <- stopCh:
                    return
                case dataCh <- rand.Intn(Max):
                }
            }
        }()
    }

    // the receiver
    go func() {
        for value := range dataCh {
            if value == Max-1 {
                fmt.Println("send stop signal to senders.")
                close(stopCh)
                return
            }

            fmt.Println(value)
        }
    }()

    select {
    case <- time.After(time.Hour):
    }
}

这里的 stopCh 就是信号 channel,它本身只有一个 sender,因此可以直接关闭它。senders 收到了关闭信号后,select 分支 “case <- stopCh” 被选中,退出函数,不再发送数据。

在 Go 语言中,对于一个 channel,如果最终没有任何 goroutine 引用它,不管 channel 有没有被关闭,最终都会被 gc 回收。所以,在这种情形下,所谓的优雅地关闭 channel 就是不关闭 channel,让 gc 代劳。

  • 情形三:M个接收者和N个发送者,它们中的任何协程都可以让一个中间调解协程帮忙发出停止数据传送的信号。

M 个 receiver 都向它发送关闭 dataCh 的“请求”,中间人收到第一个请求后,就会直接下达关闭 dataCh 的指令(通过关闭 stopCh,这时就不会发生重复关闭的情况,因为 stopCh 的发送方只有中间人一个)。另外,这里的 N 个 sender 也可以向中间人发送关闭 dataCh 的请求。

代码语言:go复制
func main() {
    rand.Seed(time.Now().UnixNano())

    const Max = 100000
    const NumReceivers = 10
    const NumSenders = 1000

    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})

    // It must be a buffered channel.
    toStop := make(chan string, 1)

    var stoppedBy string

    // moderator
    go func() {
        stoppedBy = <-toStop
        close(stopCh)
    }()

    // senders
    for i := 0; i < NumSenders; i   {
        go func(id string) {
            for {
                value := rand.Intn(Max)
                if value == 0 {
                    select {
                    case toStop <- "sender#"   id:
                    default:
                    }
                    return
                }

                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }

    // receivers
    for i := 0; i < NumReceivers; i   {
        go func(id string) {
            for {
                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    if value == Max-1 {
                        select {
                        case toStop <- "receiver#"   id:
                        default:
                        }
                        return
                    }

                    fmt.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }

    select {
    case <- time.After(time.Hour):
    }

}

代码里 toStop 就是中间人的角色,使用它来接收 senders 和 receivers 发送过来的关闭 dataCh 请求。

这里将 toStop 声明成了一个 缓冲型的 channel。假设 toStop 声明的是一个非缓冲型的 channel,那么第一个发送的关闭 dataCh 请求可能会丢失。因为无论是 sender 还是 receiver 都是通过 select 语句来发送请求,如果中间人所在的 goroutine 没有准备好,那 select 语句就不会选中,直接走 default 选项,什么也不做。这样,第一个关闭 dataCh 的请求就会丢失。

如果,我们把 toStop 的容量声明成 Num(senders) Num(receivers),那发送 dataCh 请求的部分可以改成更简洁的形式:

代码语言:go复制
...
toStop := make(chan string, NumReceivers   NumSenders)
...
            value := rand.Intn(Max)
            if value == 0 {
                toStop <- "sender#"   id
                return
            }
...
                if value == Max-1 {
                    toStop <- "receiver#"   id
                    return
                }
...

直接向 toStop 发送请求,因为 toStop 容量足够大,所以不用担心阻塞,自然也就不用 select 语句再加一个 default case 来避免阻塞。

1.1.12.超时控制

代码语言:go复制
select {
  case <- ch:
    // get data from ch
  case <- time.After(2 * time.Second)
    // read data from ch timeout
}

1.1.13.数据结构

代码语言:go复制
type hchan struct {
	qcount   uint   // chan 里元素数量  
    
	dataqsiz uint   	// chan 底层循环数组的长度
    
	buf      unsafe.Pointer  // 指向底层循环数组的指针,只针对有缓冲的 channel
	
	elemsize uint16  // chan 中元素大小
	
	closed   uint32  // chan 是否被关闭的标志
	
	elemtype *_type // chan 中元素类型
	
	sendx    uint   // 已发送元素在循环数组中的索引
	
	recvx    uint   // 已接收元素在循环数组中的索引
	
	recvq    waitq  // 等待接收的 goroutine 队列
	
	sendq    waitq  // 等待发送的 goroutine 队列

	lock mutex   // 保护 hchan 中所有字段
}
  • qcount — Channel 中的元素个数;
  • dataqsiz — Channel 中的循环队列的长度;
  • buf — Channel 的缓冲区数据指针,是个循环链表
  • sendx — Channel 的发送操作处理到的位置;
  • recvx — Channel 的接收操作处理到的位置;

elemsize 和 elemtype 分别表示当前 Channel 能够收发的元素类型和大小;sendq 和 recvq 存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表 [runtime.waitq](https://draveness.me/golang/tree/runtime.waitq) 表示,链表中所有的元素都是 [runtime.sudog](https://draveness.me/golang/tree/runtime.sudog) 结构:

channel 的创建都是调用的 mallocgc 方法,也就是 channel 都是创建在堆上的。因此 channel 是会被 GC 回收的,自然也不总是需要 close 方法来进行显示关闭了。

1.1.14环形队列

channel 内部实现了一个环形队列作为其缓冲区,队列的长度是创建 channel 时指定的。

image.pngimage.png
  • dataqsiz 表明了队列长度为6,即可缓存6个元素;
  • buf 指向队列的内存地址;
  • qcount 表示队列中还有两个元素;
  • sendx 表示后续写入的数据存储的位置,取值为 [0, 6);
  • recvx 表示读取数据的位置, 取值为[0, 6)。

1.1.15 等待队列

从 channel 读取数据时,如果没有缓冲区或者缓冲区为空,则当前协程会被阻塞,并被加入 recvq 队列。向 channel 写入数据时,如果没有缓冲区或者缓冲区已满,则当前协程同样会被阻塞,然后加入到 sendq 的队列。处于等待队列中的协程会在其他协程操作 channel 时被唤醒。

1.1.16 发送数据

代码语言:go复制
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 如果通道为 nil,非阻塞式发送的话直接返回 false,否则将当前协程挂起
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
 
    // 对于非阻塞式发送,如果通道未关闭且没有缓冲空间的话,直接返回 false
    if !block && c.closed == 0 && full(c) {
        return false
    }

    // 加锁,并发安全
    lock(&c.lock)

    // 如果通道关闭了,直接 panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    // 如果接收队列不为空,直接将要发送的数据发送到队首的 goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 对于缓冲区还有空闲的 channel,拷贝数据到缓冲区,维护相关信息
    if c.qcount < c.dataqsiz {
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx  
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount  
        unlock(&c.lock)
        return true
    }

    // 没有缓冲空间时,发送方会挂起,并根据当前 goroutine 构造一个 sudog 结构体添加到 sendq 队列中
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)

    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

    // 省略被唤醒时部分代码

    return true
}

1.1.17 读数据

代码语言:go复制
// selected 和 received 返回值分别代表是否可被 select 语句命中以及是否读取到了数据
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 如果 channel 为 nil,非阻塞式读取直接返回,否则直接挂起
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // 非阻塞模式并且没有消息可读(没有缓冲区或者缓冲区为空),如果 channel 未关闭直接返回
    if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
            return
        }

        if empty(c) {
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    }

    // 加锁
    lock(&c.lock)

    // channel 已关闭并且没有消息可读(没有缓冲区或者缓冲区为空),会接收到零值,typedmemclr 会根据类型清理相应地址的内存
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    // 等待发送队列不为空,如果是非缓冲型 channel,直接拷贝发送者的数据,否则接收队首的数据,并将发送者的数据移动到环形队列尾部
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 缓冲型 channel,buf 里有元素,可以正常接收
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx  
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    // 被阻塞的情况,构造一个 sudog 结构体,保存到 channel 的等待接收队列,并将当前 goroutine 挂起
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)

    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // 省略被唤醒时部分代码
    
    return true, !closed
}

1.1.18 创建

创建channel时调用了运行时方法makechan:

代码语言:go复制
func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

    // 计算缓冲区需要的总大小(缓冲区大小*元素大小),并判断是否超出最大可分配范围
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
         // 缓冲区大小为0,或者channel中元素大小为0(struct{}{})时,只需分配channel必需的空间即可
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
        // 通过位运算知道channel中元素类型不是指针,分配一片连续内存空间,所需空间等于 缓冲区数组空间   hchan必需的空间。
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
        // 元素中包含指针,为hchan和缓冲区分别分配空间
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "n")
	}
	return c
}

makechan的代码逻辑还是比较简单的,首先校验元素类型和缓冲区空间大小,然后创建hchan,分配所需空间。这里有三种情况:当缓冲区大小为0,或者channel中元素大小为0时,只需分配channel必需的空间即可;当channel元素类型不是指针时,则只需要为hchan和缓冲区分配一片连续内存空间,空间大小为缓冲区数组空间加上hchan必需的空间;默认情况,缓冲区包含指针,则需要为hchan和缓冲区分别分配内存。最后更新hchan的其他字段,包括elemsize,elemtype,dataqsiz。

原理

创建channel实际上就是在内存中实例化了一个hchan的结构体,并返回一个ch指针,使用过程中channel在函数之间的传递都是用的这个指针,这就是为什么函数传递中无需使用channel的指针,而直接用channel就行了,因为channel本身就是一个指针。

channel就是用了一个锁。hchan本身包含一个互斥锁mutex

channel中有个缓存buf,是用来缓存数据的

当使用send (ch <- xx)或者recv ( <-ch)的时候,首先要锁住hchan这个结构体。

当channel缓存满了之后会发生什么?这其中的原理是怎样的?

goroutine的阻塞操作,实际上是调用send (ch <- xx)或者recv ( <-ch)的时候主动触发的,具体请看以下内容:

这个时候G1正在正常运行,当再次进行send操作(ch<-1)的时候,会主动调用Go的调度器,让G1等待,并从让出M,让其他G去使用

同时G1也会被抽象成含有G1指针和send元素的sudog结构体保存到hchan的sendq中等待被唤醒。

那么,G1什么时候被唤醒呢?这个时候G2隆重登场。

G2执行了recv操作p := <-ch,于是会发生以下的操作:

G2从缓存队列中取出数据,channel会将等待队列中的G1推出,将G1当时send的数据推到缓存中,然后调用Go的scheduler,唤醒G1,并把G1放到可运行的Goroutine队列中。

select-case 情况

  1. 遍历所有的case项,分四种情况进行判断。
    • 类型是空:直接跳过。
    • 接收类型:先判断发送队列是否为空,不为空,结束循环;再判断缓冲区是否有数据,有数据,结束循环;最后判断通道是否关闭,已关闭,结束循环。
    • 发送类型:先判断通道是否关闭,已关闭,则panic;再判断接收队列是否为空,不为空,结束循环;判断缓冲区是否已经满了,未满,结束循环。
    • default类型:不会结束循环,所以优先级最低。
  2. 在所有case都不满足的情况下,当前goroutine就会进入waiting状态,等待被唤醒。
  3. 唤醒后进行比对,取出对应的case索引即可。

select 基本用法

代码语言:go复制
select {
    case <- c1:
    // 如果c1成功读到数据,则进行该case处理语句
    case c2 <- 1:
    // 如果成功向c2写入数据,则进行该case处理语句
    default:
    // 如果上面都没有成功,则进入default处理流程
}

发送数据和接收数据过程

1、锁定整个通道结构。

2、确定写入。尝试recvq从等待队列中等待goroutine,然后将元素直接写入goroutine。

3、如果recvq为Empty,则确定缓冲区是否可用。如果可用,从当前goroutine复制数据到缓冲区。

4、如果缓冲区已满,则要写入的元素将保存在当前正在执行的goroutine的结构中,并且当前goroutine将在sendq中排队并从运行时挂起。

5、写入完成释放锁。

发送数据时先判断channel类型,如果有缓冲区,判断channel是否还有空间,然后从等待channel中获取等待channel中的接受者,如果取到接收者,则将对象直接传递给接受者,然后将接受者所在的go放入P所在的可运行G队列,发送过程完成,如果未取到接收者,则将发送者enqueue到发送channel,发送者进入阻塞状态,有缓冲的channel需要先判断channel缓冲是否还有空间,如果缓冲空间已满,则将发送者enqueue到发送channel,发送者进入阻塞状态如果缓冲空间未满,则将元素copy到缓冲中,这时发送者就不会进入阻塞状态,最后尝试唤醒等待队列中的一个接受者。

接收channel与发送类似首先也是判断channel的类型,然后如果是有缓冲的channel就判断缓冲中是否有元素,接着从channel中获取接受者,如果取到,则直接从接收者获取元素,并唤醒发送者,本次接收过程完成,如果没有取到接收者,阻塞当前的goroutine并等待发送者唤醒,如果是拥有缓冲的channel需要先判断缓冲中是否有元素,缓冲为空时,阻塞当前goroutine并等待发送者唤醒,缓冲如果不为空,则取出缓冲中的第一个元素,然后尝试唤醒channel中的一个发送者

1、先获取channel全局锁

2、尝试sendq从等待队列中获取等待的goroutine,

3、 如有等待的goroutine,没有缓冲区,取出goroutine并读取数据,然后唤醒这个goroutine,结束读取释放锁。

4、如有等待的goroutine,且有缓冲区(此时缓冲区已满),从缓冲区队首取出数据,再从sendq取出一个goroutine,将goroutine中的数据存入buf队尾,结束读取释放锁。

5、如没有等待的goroutine,且缓冲区有数据,直接读取缓冲区数据,结束读取释放锁。

6、如没有等待的goroutine,且没有缓冲区或缓冲区为空,将当前的goroutine加入recvq排队,进入睡眠,等待被写goroutine唤醒。结束读取释放锁。

1.1.12. 通道总结

channel 常见的异常总结,如下图:

注意: 关闭已经关闭的 channel 也会引发 panic。

channel 为什么是并发安全的呢?是因为 channel 内部使用了互斥锁来保证并发的安全

如何优雅地关闭通道

channel

channel 有哪些应用

定时任务
代码语言:go复制
select {
	case <-time.After(100 * time.Millisecond):
	case <-s.stopc:
		return false
}

等待 100 ms 后,如果 s.stopc 还没有读出数据或者被关闭,就直接结束。这是来自 etcd 源码里的一个例子,这样的写法随处可见。

执行定时任务

代码语言:go复制
func worker() {
	ticker := time.Tick(1 * time.Second)
	for {
		select {
		case <- ticker:
			// 执行定时任务
			fmt.Println("执行 1s 定时任务")
		}
	}
}
解耦生产方和消费方

服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for {} 无限循环里,从某个 channel 消费工作任务并执行:

代码语言:go复制
func main() {
	taskCh := make(chan int, 100)
	go worker(taskCh)

    // 塞任务
	for i := 0; i < 10; i   {
		taskCh <- i
	}

    // 等待 1 小时 
	select {
	case <-time.After(time.Hour):
	}
}

func worker(taskCh <-chan int) {
	const N = 5
	// 启动 5 个工作协程
	for i := 0; i < N; i   {
		go func(id int) {
			for {
				task := <- taskCh
				fmt.Printf("finish task: %d by worker %dn", task, id)
				time.Sleep(time.Second)
			}
		}(i)
	}
}

5 个工作协程在不断地从工作队列里取任务,生产方只管往 channel 发送任务即可,解耦生产方和消费方。

控制并发数

有时需要定时执行几百个任务,例如每天定时按城市来执行一些离线计算的任务。但是并发数又不能太高,因为任务执行过程依赖第三方的一些资源,对请求的速率有限制。这时就可以通过 channel 来控制并发数。

代码语言:go复制
var limit = make(chan int, 3)

func main() {
    // …………
    for _, w := range work {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    // …………
}

构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动作在 w() 中完成,在执行 w() 之前,先要从 limit 中拿“许可证”,拿到许可证之后,才能执行 w(),并且在执行完任务,要将“许可证”归还。这样就可以控制同时运行的 goroutine 数。

这里,limit <- 1 放在 func 内部而不是外部,原因是:

如果在外层,就是控制系统 goroutine 的数量,可能会阻塞 for 循环,影响业务逻辑。 limit 其实和逻辑无关,只是性能调优,放在内层和外层的语义不太一样。

还有一点要注意的是,如果 w() 发生 panic,那“许可证”可能就还不回去了,因此需要使用 defer 来保证

Q&A

1. 提前关channel会丢失数据 ?

源码地址

Hchan是channel的主要数据结构,我们关心的 qcount用来表示消息的个数,closed int32标识用来表示chan的开关,1为关闭.

代码语言:go复制
type hchan struct {
  qcount   uint           // total data in the queue
  ...
  closed   uint32
  ...

Closechan是chan.go里的关闭channel的方法,该方法除了将 c.closed 设置为 1。还需要唤醒 recvq和sendq 队列里面的阻塞 goroutine. 如果你不唤醒的化,因为没有生产者send数据,也就无法通过事件来唤醒goroutine,所以这时候需要把他们都唤醒起来,判断是否退出的逻辑。

代码语言:go复制
func closechan(c *hchan) {
    // 如果 channel 为 nil,直接 panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    // 加锁,如果 channel 已关闭,直接 panic
    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    c.closed = 1

    var glist gList

    // 释放等待接收队列中,向需要返回值的接收者返回相应的零值
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // 释放等待发送队列,相关的 goroutine 会触发panic
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // ...
}

消费者接收事件相关的逻辑,不是简单的判断closed标志位,而且会判断channel消息个数

代码语言:go复制
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...

    lock(&c.lock)


    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
}

...

测试代码

代码语言:go复制
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    ch := make(chan int, 2000)

    var wg sync.WaitGroup

    go func() {
        for i := 0; i < 200; i   {
            ch <- i
        }
        close(ch)
    }()

    for i := 0; i < 10; i   {
        wg.Add(1)
        go work(&wg, ch)
    }
    wg.Wait()
    fmt.Println("exit")
}

func work(wg *sync.WaitGroup, ch chan int) {
    defer wg.Done()
    for {
        time.Sleep(100 * time.Millisecond)
        select {
        case data, ok := <- ch:
            fmt.Println(data, ok)
        default:
            fmt.Println("not data")
        }

        item, ok := <-ch
        if !ok {
            break
        }
    }
}

参考

深度解密Go语言之channel

channel 是并发安全的吗

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

0 人点赞