例1
编写一个程序,其中两个 goroutine 来回传递一个整数十次。当每个 goroutine 接收到整数时打印。每次通过整数都增加。一旦整数等于 10,立刻终止程序。
代码语言:javascript复制func main() {
share := make(chan int)
// Create the WaitGroup and add a count
// of two, one for each goroutine.
var wg sync.WaitGroup
wg.Add(2)
// Launch two goroutines.
go func() {
goroutine("Bill", share)
wg.Done()
}()
go func() {
goroutine("Joan", share)
wg.Done()
}()
// Start the sharing.
share <- 1
// Wait for the program to finish.
wg.Wait()
}
// goroutine simulates sharing a value.
func goroutine(name string, share chan int) {
for {
// Wait to receive a value.
value, ok := <-share
if !ok {
// If the channel was closed, return.
fmt.Printf("Goroutine %s Downn", name)
return
}
// Display the value.
fmt.Printf("Goroutine %s Inc %dn", name, value)
// Terminate when the value is 10.
if value == 10 {
close(share)
fmt.Printf("Goroutine %s Downn", name)
return
}
// Increment the value and send it
// over the channel.
share <- value 1
}
}
例2
编写一个程序,使用扇出模式同时生成 100 个随机数。让每个 goroutine 生成一个随机数,并通过缓冲channel将该数字返回给主 goroutine。设置缓冲区channel的大小,以便永远不会发送阻塞。不要分配比您需要的更多的缓冲区。让主 goroutine 显示它收到的每个随机数,然后终止程序。
代码语言:javascript复制package main
import (
"fmt"
"math/rand"
"time"
)
const (
goroutines = 100
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
// Create the buffer channel with a buffer for
// each goroutine to be created.
values := make(chan int, goroutines)
// Iterate and launch each goroutine.
for gr := 0; gr < goroutines; gr {
// Create an anonymous function for each goroutine that
// generates a random number and sends it on the channel.
go func() {
values <- rand.Intn(1000)
}()
}
// Create a variable to be used to track received messages.
// Set the value to the number of goroutines created.
wait := goroutines
// Iterate receiving each value until they are all received.
// Store them in a slice of ints.
var nums []int
for wait > 0 {
nums = append(nums, <-values)
wait--
}
// Print the values in our slice.
fmt.Println(nums)
}
例3
编写一个程序,最多同时生成 100 个随机数。不要发送所有的 100 个值,因为发送/接收的数量是未知的。
代码语言:javascript复制package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
const (
goroutines = 100
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
// Create the channel for sharing results.
values := make(chan int)
// Create a sync.WaitGroup to monitor the Goroutine pool. Add the count.
var wg sync.WaitGroup
wg.Add(goroutines)
// Iterate and launch each goroutine.
for gr := 0; gr < goroutines; gr {
// Create an anonymous function for each goroutine.
go func() {
// Ensure the waitgroup is decremented when this function returns.
defer wg.Done()
// Generate a random number up to 1000.
n := rand.Intn(1000)
// Return early if the number is divisible by 2. n%2 == 0
if n%2 == 0 {
return
}
// Send the odd values through the channel.
values <- n
}()
}
// Create a goroutine that waits for the other goroutines to finish then
// closes the channel.
go func() {
wg.Wait()
close(values)
}()
// Receive from the channel until it is closed.
// Store values in a slice of ints.
var nums []int
for n := range values {
nums = append(nums, n)
}
// Print the values in our slice.
fmt.Printf("Result count: %dn", len(nums))
fmt.Println(nums)
}
例4
编写一个程序,使用work pool同时生成最多 100 个随机数。拒绝偶数值。如果已收集到 100 个奇数,就让协程停止运行。
代码语言:javascript复制package main
import (
"fmt"
"math/rand"
"runtime"
"sync"
)
func main() {
// Create the channel for sharing results.
values := make(chan int)
// Create a channel "shutdown" to tell goroutines when to terminate.
shutdown := make(chan struct{})
// Define the size of the worker pool. Use runtime.GOMAXPROCS(0) to size the pool based on number of processors.
poolSize := runtime.GOMAXPROCS(0)
// Create a sync.WaitGroup to monitor the Goroutine pool. Add the count.
var wg sync.WaitGroup
wg.Add(poolSize)
// Create a fixed size pool of goroutines to generate random numbers.
for i := 0; i < poolSize; i {
go func(id int) {
// Start an infinite loop.
for {
// Generate a random number up to 1000.
n := rand.Intn(1000)
// Use a select to either send the number or receive the shutdown signal.
select {
// In one case send the random number.
case values <- n:
fmt.Printf("Worker %d sent %dn", id, n)
// In another case receive from the shutdown channel.
case <-shutdown:
fmt.Printf("Worker %d shutting downn", id)
wg.Done()
return
}
}
}(i)
}
// Create a slice to hold the random numbers.
var nums []int
for i := range values {
// continue the loop if the value was even.
if i%2 == 0 {
fmt.Println("Discarding", i)
continue
}
// Store the odd number.
fmt.Println("Keeping", i)
nums = append(nums, i)
// break the loop once we have 100 results.
if len(nums) == 100 {
break
}
}
// Send the shutdown signal by closing the channel.
fmt.Println("Receiver sending shutdown signal")
close(shutdown)
// Wait for the Goroutines to finish.
wg.Wait()
// Print the values in our slice.
fmt.Printf("Result count: %dn", len(nums))
fmt.Println(nums)
}
例5
使用2个goroutine往n大小的通道中模拟任务生产。select中的case哪个可以读取则打印出数据,每隔5秒我们来看一下生产的消息还有多少没有被打印过。
代码语言:javascript复制func main() {
var t1 = makeTask("adoJob", 1000)
var t2 = makeTask("xs25Job", 500)
var tick = time.Tick(time.Second * 5)
for {
select {
case task:=<-t1:
log.Println(task)
case task:=<-t2:
log.Println(task)
case <-tick:
log.Println(fmt.Sprintf("队列挤压数量t1:%v个,t2:%v个", len(t1), len(t2)))
}
time.Sleep(time.Second * 1)
}
}
// 生产数据
func makeTask(queueName string, n int) chan string {
ch := make(chan string, n)
go func() {
i := 1
for {
// 假设生产任务占用时间
time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000)))
ch <- fmt.Sprintf("%s,生产数据 %d", queueName, i)
i
}
}()
return ch
}
每隔5秒钟有可能执行不到这个case。原因是多个case都满足时随机执行其中一个。
代码语言:javascript复制2023/03/22 17:22:34 adoJob,生产数据 21
2023/03/22 17:22:35 xs25Job,生产数据 22
2023/03/22 17:22:36 队列挤压数量t1:29个,t2:33个
2023/03/22 17:22:37 xs25Job,生产数据 23
2023/03/22 17:22:38 xs25Job,生产数据 24
2023/03/22 17:22:39 adoJob,生产数据 22
2023/03/22 17:22:40 xs25Job,生产数据 25
2023/03/22 17:22:41 adoJob,生产数据 23
2023/03/22 17:22:42 xs25Job,生产数据 26
2023/03/22 17:22:43 xs25Job,生产数据 27
2023/03/22 17:22:44 队列挤压数量t1:35个,t2:34个
2023/03/22 17:22:45 xs25Job,生产数据 28
2023/03/22 17:22:46 adoJob,生产数据 24
2023/03/22 17:22:47 队列挤压数量t1:37个,t2:37个
例6
使用goroutine与无缓冲通道做一个消费端,将代码再改进一下。
代码语言:javascript复制func main() {
var t1 = makeTask("adoJob", 1000)
var t2 = makeTask("xs25Job", 1000)
var allTask []string //因为我想只做一个消费端,将2个生产端生产出来的消费都扔到一起
var tick = time.Tick(time.Second * 5) //每隔一段时间报告队列积压情况
var workerCh = worker()
for {
var taskInfo string //具体任务
var ch chan<- string
if len(allTask) > 0 {
taskInfo = allTask[0] //从所有任务中取出每一个
ch = workerCh
}
select {
case task := <-t1:
allTask = append(allTask, task)
case task := <-t2:
allTask = append(allTask, task)
case ch <- taskInfo: //任务详情写入到要消费工作中
allTask = allTask[1:]
case <-tick:
log.Println("队列挤压数量", len(allTask))
}
}
}
//生产数据
func makeTask(queueName string, n int) chan string {
ch := make(chan string, n)
go func() {
i := 1
for {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000))) //假设生产任务占用时间
ch <- fmt.Sprintf("%s,生产数据 %d", queueName, i)
i
}
}()
return ch
}
//消费数据
func worker() chan<- string {
ch := make(chan string) //无缓冲通道
go func(tasks chan string) {
for t := range tasks {
time.Sleep(time.Second * 1) //假设我们每次消费任务需要花费1秒钟
log.Printf("消费任务: %s n", t)
}
}(ch)
return ch
}
例7
解耦生产方和消费方
服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for {} 无限循环里,从某个 channel 消费工作任务并执行:
代码语言:javascript复制func main() {
taskCh := make(chan int, 100)
go worker(taskCh)
// 塞任务
for i := 0; i < 10; i {
taskCh <- i
}
// 等待 1 小时
select {
case <-time.After(time.Hour):
}
}
func worker(taskCh <-chan int) {
const N = 5
// 启动 5 个工作协程
for i := 0; i < N; i {
go func(id int) {
for {
task := <- taskCh
fmt.Printf("finish task: %d by worker %dn", task, id)
time.Sleep(time.Second)
}
}(i)
}
}
5 个工作协程在不断地从工作队列里取任务,生产方只管往 channel 发送任务即可,解耦生产方和消费方。
例8
控制并发数
代码语言:javascript复制var limit = make(chan int, 3)
func main() {
// …………
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
// …………
}
构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动作在 w() 中完成,在执行 w() 之前,先要从 limit 中拿“许可证”,拿到许可证之后,才能执行 w(),并且在执行完任务,要将“许可证”归还。这样就可以控制同时运行的 goroutine 数。
一点要注意的是,如果 w() 发生 panic,那“许可证”可能就还不回去了,因此需要使用 defer 来保证。
Go Channel 应用模式